jsocket.cpp 191 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440344134423443344434453446344734483449345034513452345334543455345634573458345934603461346234633464346534663467346834693470347134723473347434753476347734783479348034813482348334843485348634873488348934903491349234933494349534963497349834993500350135023503350435053506350735083509351035113512351335143515351635173518351935203521352235233524352535263527352835293530353135323533353435353536353735383539354035413542354335443545354635473548354935503551355235533554355535563557355835593560356135623563356435653566356735683569357035713572357335743575357635773578357935803581358235833584358535863587358835893590359135923593359435953596359735983599360036013602360336043605360636073608360936103611361236133614361536163617361836193620362136223623362436253626362736283629363036313632363336343635363636373638363936403641364236433644364536463647364836493650365136523653365436553656365736583659366036613662366336643665366636673668366936703671367236733674367536763677367836793680368136823683368436853686368736883689369036913692369336943695369636973698369937003701370237033704370537063707370837093710371137123713371437153716371737183719372037213722372337243725372637273728372937303731373237333734373537363737373837393740374137423743374437453746374737483749375037513752375337543755375637573758375937603761376237633764376537663767376837693770377137723773377437753776377737783779378037813782378337843785378637873788378937903791379237933794379537963797379837993800380138023803380438053806380738083809381038113812381338143815381638173818381938203821382238233824382538263827382838293830383138323833383438353836383738383839384038413842384338443845384638473848384938503851385238533854385538563857385838593860386138623863386438653866386738683869387038713872387338743875387638773878387938803881388238833884388538863887388838893890389138923893389438953896389738983899390039013902390339043905390639073908390939103911391239133914391539163917391839193920392139223923392439253926392739283929393039313932393339343935393639373938393939403941394239433944394539463947394839493950395139523953395439553956395739583959396039613962396339643965396639673968396939703971397239733974397539763977397839793980398139823983398439853986398739883989399039913992399339943995399639973998399940004001400240034004400540064007400840094010401140124013401440154016401740184019402040214022402340244025402640274028402940304031403240334034403540364037403840394040404140424043404440454046404740484049405040514052405340544055405640574058405940604061406240634064406540664067406840694070407140724073407440754076407740784079408040814082408340844085408640874088408940904091409240934094409540964097409840994100410141024103410441054106410741084109411041114112411341144115411641174118411941204121412241234124412541264127412841294130413141324133413441354136413741384139414041414142414341444145414641474148414941504151415241534154415541564157415841594160416141624163416441654166416741684169417041714172417341744175417641774178417941804181418241834184418541864187418841894190419141924193419441954196419741984199420042014202420342044205420642074208420942104211421242134214421542164217421842194220422142224223422442254226422742284229423042314232423342344235423642374238423942404241424242434244424542464247424842494250425142524253425442554256425742584259426042614262426342644265426642674268426942704271427242734274427542764277427842794280428142824283428442854286428742884289429042914292429342944295429642974298429943004301430243034304430543064307430843094310431143124313431443154316431743184319432043214322432343244325432643274328432943304331433243334334433543364337433843394340434143424343434443454346434743484349435043514352435343544355435643574358435943604361436243634364436543664367436843694370437143724373437443754376437743784379438043814382438343844385438643874388438943904391439243934394439543964397439843994400440144024403440444054406440744084409441044114412441344144415441644174418441944204421442244234424442544264427442844294430443144324433443444354436443744384439444044414442444344444445444644474448444944504451445244534454445544564457445844594460446144624463446444654466446744684469447044714472447344744475447644774478447944804481448244834484448544864487448844894490449144924493449444954496449744984499450045014502450345044505450645074508450945104511451245134514451545164517451845194520452145224523452445254526452745284529453045314532453345344535453645374538453945404541454245434544454545464547454845494550455145524553455445554556455745584559456045614562456345644565456645674568456945704571457245734574457545764577457845794580458145824583458445854586458745884589459045914592459345944595459645974598459946004601460246034604460546064607460846094610461146124613461446154616461746184619462046214622462346244625462646274628462946304631463246334634463546364637463846394640464146424643464446454646464746484649465046514652465346544655465646574658465946604661466246634664466546664667466846694670467146724673467446754676467746784679468046814682468346844685468646874688468946904691469246934694469546964697469846994700470147024703470447054706470747084709471047114712471347144715471647174718471947204721472247234724472547264727472847294730473147324733473447354736473747384739474047414742474347444745474647474748474947504751475247534754475547564757475847594760476147624763476447654766476747684769477047714772477347744775477647774778477947804781478247834784478547864787478847894790479147924793479447954796479747984799480048014802480348044805480648074808480948104811481248134814481548164817481848194820482148224823482448254826482748284829483048314832483348344835483648374838483948404841484248434844484548464847484848494850485148524853485448554856485748584859486048614862486348644865486648674868486948704871487248734874487548764877487848794880488148824883488448854886488748884889489048914892489348944895489648974898489949004901490249034904490549064907490849094910491149124913491449154916491749184919492049214922492349244925492649274928492949304931493249334934493549364937493849394940494149424943494449454946494749484949495049514952495349544955495649574958495949604961496249634964496549664967496849694970497149724973497449754976497749784979498049814982498349844985498649874988498949904991499249934994499549964997499849995000500150025003500450055006500750085009501050115012501350145015501650175018501950205021502250235024502550265027502850295030503150325033503450355036503750385039504050415042504350445045504650475048504950505051505250535054505550565057505850595060506150625063506450655066506750685069507050715072507350745075507650775078507950805081508250835084508550865087508850895090509150925093509450955096509750985099510051015102510351045105510651075108510951105111511251135114511551165117511851195120512151225123512451255126512751285129513051315132513351345135513651375138513951405141514251435144514551465147514851495150515151525153515451555156515751585159516051615162516351645165516651675168516951705171517251735174517551765177517851795180518151825183518451855186518751885189519051915192519351945195519651975198519952005201520252035204520552065207520852095210521152125213521452155216521752185219522052215222522352245225522652275228522952305231523252335234523552365237523852395240524152425243524452455246524752485249525052515252525352545255525652575258525952605261526252635264526552665267526852695270527152725273527452755276527752785279528052815282528352845285528652875288528952905291529252935294529552965297529852995300530153025303530453055306530753085309531053115312531353145315531653175318531953205321532253235324532553265327532853295330533153325333533453355336533753385339534053415342534353445345534653475348534953505351535253535354535553565357535853595360536153625363536453655366536753685369537053715372537353745375537653775378537953805381538253835384538553865387538853895390539153925393539453955396539753985399540054015402540354045405540654075408540954105411541254135414541554165417541854195420542154225423542454255426542754285429543054315432543354345435543654375438543954405441544254435444544554465447544854495450545154525453545454555456545754585459546054615462546354645465546654675468546954705471547254735474547554765477547854795480548154825483548454855486548754885489549054915492549354945495549654975498549955005501550255035504550555065507550855095510551155125513551455155516551755185519552055215522552355245525552655275528552955305531553255335534553555365537553855395540554155425543554455455546554755485549555055515552555355545555555655575558555955605561556255635564556555665567556855695570557155725573557455755576557755785579558055815582558355845585558655875588558955905591559255935594559555965597559855995600560156025603560456055606560756085609561056115612561356145615561656175618561956205621562256235624562556265627562856295630563156325633563456355636563756385639564056415642564356445645564656475648564956505651565256535654565556565657565856595660566156625663566456655666566756685669567056715672567356745675567656775678567956805681568256835684568556865687568856895690569156925693569456955696569756985699570057015702570357045705570657075708570957105711571257135714571557165717571857195720572157225723572457255726572757285729573057315732573357345735573657375738573957405741574257435744574557465747574857495750575157525753575457555756575757585759576057615762576357645765576657675768576957705771577257735774577557765777577857795780578157825783578457855786578757885789579057915792579357945795579657975798579958005801580258035804580558065807580858095810581158125813581458155816581758185819582058215822582358245825582658275828582958305831583258335834583558365837583858395840584158425843584458455846584758485849585058515852585358545855585658575858585958605861586258635864586558665867586858695870587158725873587458755876587758785879588058815882588358845885588658875888588958905891589258935894589558965897589858995900590159025903590459055906590759085909591059115912591359145915591659175918591959205921592259235924592559265927592859295930593159325933593459355936593759385939594059415942594359445945594659475948594959505951595259535954595559565957595859595960596159625963596459655966596759685969597059715972597359745975597659775978597959805981598259835984598559865987598859895990599159925993599459955996599759985999600060016002600360046005600660076008600960106011601260136014601560166017601860196020602160226023602460256026602760286029603060316032603360346035603660376038603960406041604260436044604560466047604860496050605160526053605460556056605760586059606060616062606360646065606660676068606960706071607260736074607560766077607860796080608160826083608460856086608760886089609060916092609360946095609660976098609961006101610261036104610561066107610861096110611161126113611461156116611761186119612061216122612361246125612661276128612961306131613261336134613561366137613861396140614161426143614461456146614761486149615061516152615361546155615661576158615961606161616261636164616561666167616861696170617161726173617461756176617761786179618061816182618361846185618661876188618961906191619261936194619561966197619861996200620162026203620462056206620762086209621062116212621362146215621662176218621962206221622262236224622562266227622862296230623162326233623462356236623762386239624062416242624362446245624662476248624962506251625262536254625562566257625862596260626162626263626462656266626762686269627062716272627362746275627662776278627962806281628262836284628562866287628862896290629162926293629462956296629762986299630063016302630363046305630663076308630963106311631263136314631563166317631863196320632163226323632463256326632763286329633063316332633363346335633663376338633963406341634263436344634563466347634863496350635163526353635463556356635763586359636063616362636363646365636663676368636963706371637263736374637563766377637863796380638163826383638463856386638763886389639063916392639363946395639663976398639964006401640264036404640564066407640864096410641164126413641464156416641764186419642064216422642364246425642664276428642964306431643264336434643564366437643864396440644164426443644464456446644764486449645064516452645364546455645664576458645964606461646264636464646564666467646864696470647164726473647464756476647764786479648064816482648364846485648664876488648964906491649264936494649564966497649864996500650165026503650465056506650765086509651065116512651365146515651665176518651965206521652265236524652565266527652865296530653165326533653465356536653765386539654065416542654365446545654665476548654965506551655265536554655565566557655865596560656165626563656465656566656765686569657065716572657365746575657665776578657965806581658265836584658565866587658865896590659165926593659465956596659765986599660066016602660366046605660666076608660966106611661266136614661566166617
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. // New IPv6 Version - IN PROGRESS
  14. /*
  15. TBD IPv6 connect
  16. multicast
  17. look at loopback
  18. */
  19. #include "platform.h"
  20. #ifdef _VER_C5
  21. #include <clwclib.h>
  22. #else
  23. #include "platform.h"
  24. #include <stdio.h>
  25. #endif
  26. #include <algorithm>
  27. #ifdef _WIN32
  28. #define WIN32_LEAN_AND_MEAN
  29. #include <windows.h>
  30. #include <winsock2.h>
  31. #include <ws2tcpip.h>
  32. #include <signal.h>
  33. #else
  34. #include <sys/types.h>
  35. #include <sys/socket.h>
  36. #include <netinet/tcp.h>
  37. #include <netinet/in.h>
  38. #include <arpa/inet.h>
  39. #include <stddef.h>
  40. #include <errno.h>
  41. #include <net/if.h>
  42. #include <poll.h>
  43. #endif
  44. #include <limits.h>
  45. #include "jmutex.hpp"
  46. #include "jsocket.hpp"
  47. #include "jexcept.hpp"
  48. #include "jio.hpp"
  49. #include "jmisc.hpp"
  50. #include "jthread.hpp"
  51. #include "jqueue.tpp"
  52. #include "jtime.hpp"
  53. #include "jprop.hpp"
  54. #include "jregexp.hpp"
  55. #include "jdebug.hpp"
  56. #include "build-config.h"
  57. // epoll only with linux
  58. #ifndef __linux__
  59. # undef _HAS_EPOLL_SUPPORT
  60. #else
  61. # define _HAS_EPOLL_SUPPORT
  62. # ifdef _HAS_EPOLL_SUPPORT
  63. # include <unistd.h>
  64. # include <sys/epoll.h>
  65. # ifndef EPOLLRDHUP
  66. // Centos 5.x bug - epoll.h does not define but its in the kernel
  67. # define EPOLLRDHUP 0x2000
  68. # endif
  69. # define MAX_RET_EVENTS 5000 // max events returned from epoll_wait() call
  70. # endif
  71. #endif
  72. // various options
  73. #define CONNECT_TIMEOUT_REFUSED_WAIT 1000 // maximum to sleep on connect_timeout
  74. #define TRACE_SLOW_BLOCK_TRANSFER
  75. #define DEFAULT_CONNECT_TIME (100*1000) // for connect_wait
  76. #ifndef _WIN32
  77. #define BLOCK_POLLED_SINGLE_CONNECTS // NB this is much slower in windows
  78. #define CENTRAL_NODE_RANDOM_DELAY
  79. #else
  80. #define USERECVSEM // to singlethread BF_SYNC_TRANSFER_PUSH
  81. #endif
  82. #ifdef _DEBUG
  83. //#define SOCKTRACE
  84. //#define EPOLLTRACE
  85. #endif
  86. #ifdef _TESTING
  87. #define _TRACE
  88. #endif
  89. #ifdef _TRACE
  90. #define THROWJSOCKEXCEPTION(exc) \
  91. { StringBuffer msg; \
  92. msg.appendf("Target: %s, Raised in: %s, line %d",tracename ,sanitizeSourceFile(__FILE__), __LINE__); \
  93. IJSOCK_Exception *e = new SocketException(exc,msg.str());\
  94. throw e; }
  95. #define THROWJSOCKEXCEPTION2(exc) \
  96. { StringBuffer msg; \
  97. msg.appendf("Raised in: %s, line %d",sanitizeSourceFile(__FILE__), __LINE__); \
  98. IJSOCK_Exception *e = new SocketException(exc,msg.str());\
  99. throw e; }
  100. #define LOGERR(err,ref,info) LogErr(err,ref,info,__LINE__,NULL)
  101. #define LOGERR2(err,ref,info) LogErr(err,ref,info,__LINE__,tracename)
  102. #else
  103. #define THROWJSOCKEXCEPTION(exc) \
  104. { IJSOCK_Exception *e = new SocketException(exc);\
  105. throw e; }
  106. #define THROWJSOCKEXCEPTION2(exc) THROWJSOCKEXCEPTION(exc)
  107. #define LOGERR(err,ref,info)
  108. #define LOGERR2(err,ref,info)
  109. #endif
  110. JSocketStatistics STATS;
  111. static bool IP4only=false; // slighly faster if we know no IPv6
  112. static bool IP6preferred=false; // e.g. for DNS and socket create
  113. IpSubNet PreferredSubnet(NULL,NULL); // set this if you prefer a particular subnet for debugging etc
  114. // e.g. PreferredSubnet("192.168.16.0", "255.255.255.0")
  115. static RelaxedAtomic<unsigned> pre_conn_unreach_cnt{0}; // global count of pre_connect() JSE_NETUNREACH error
  116. #define IPV6_SERIALIZE_PREFIX (0x00ff00ff)
  117. class jlib_thrown_decl SocketException: public IJSOCK_Exception, public CInterface
  118. {
  119. public:
  120. IMPLEMENT_IINTERFACE;
  121. SocketException(int code,const char *_msg=NULL) : errcode(code)
  122. {
  123. if (_msg)
  124. msg = strdup(_msg);
  125. else
  126. msg = NULL;
  127. };
  128. ~SocketException() { free(msg); }
  129. int errorCode() const { return errcode; }
  130. static StringBuffer & geterrormessage(int err,StringBuffer &str)
  131. {
  132. switch (err) {
  133. case JSOCKERR_ok: return str.append("ok");
  134. case JSOCKERR_not_opened: return str.append("socket not opened");
  135. case JSOCKERR_bad_address: return str.append("bad address");
  136. case JSOCKERR_connection_failed: return str.append("connection failed");
  137. case JSOCKERR_broken_pipe: return str.append("connection is broken");
  138. case JSOCKERR_graceful_close: return str.append("connection closed other end");
  139. case JSOCKERR_invalid_access_mode: return str.append("invalid access mode");
  140. case JSOCKERR_timeout_expired: return str.append("timeout expired");
  141. case JSOCKERR_port_in_use: return str.append("port in use");
  142. case JSOCKERR_cancel_accept: return str.append("cancel accept");
  143. case JSOCKERR_connectionless_socket: return str.append("connectionless socket");
  144. case JSOCKERR_handle_too_large: return str.append("handle too large");
  145. case JSOCKERR_bad_netaddr: return str.append("bad net addr");
  146. case JSOCKERR_ipv6_not_implemented: return str.append("IPv6 not implemented");
  147. // OS errors
  148. #ifdef _WIN32
  149. case WSAEINTR: return str.append("WSAEINTR(10004) - Interrupted system call.");
  150. case WSAEBADF: return str.append("WSAEBADF(10009) - Bad file number.");
  151. case WSAEACCES: return str.append("WSAEACCES(10013) - Permission denied.");
  152. case WSAEFAULT: return str.append("WSAEFAULT(10014) - Bad address.");
  153. case WSAEINVAL: return str.append("WSAEINVAL(10022) - Invalid argument.");
  154. case WSAEMFILE: return str.append("WSAEMFILE(10024) - Too many open files.");
  155. case WSAEWOULDBLOCK: return str.append("WSAEWOULDBLOCK(10035) - Operation would block.");
  156. case WSAEINPROGRESS: return str.append("WSAEINPROGRESS(10036) - Operation now in progress.");
  157. case WSAEALREADY: return str.append("WSAEALREADY(10037) - Operation already in progress.");
  158. case WSAENOTSOCK: return str.append("WSAENOTSOCK(10038) - Socket operation on nonsocket.");
  159. case WSAEDESTADDRREQ: return str.append("WSAEDESTADDRREQ(10039) - Destination address required.");
  160. case WSAEMSGSIZE: return str.append("WSAEMSGSIZE(10040) - Message too long.");
  161. case WSAEPROTOTYPE: return str.append("WSAEPROTOTYPE(10041) - Protocol wrong type for socket.");
  162. case WSAENOPROTOOPT: return str.append("WSAENOPROTOOPT(10042) - Protocol not available.");
  163. case WSAEPROTONOSUPPORT: return str.append("WSAEPROTONOSUPPORT(10043) - Protocol not supported.");
  164. case WSAESOCKTNOSUPPORT: return str.append("WSAESOCKTNOSUPPORT(10044) - Socket type not supported.");
  165. case WSAEOPNOTSUPP: return str.append("WSAEOPNOTSUPP(10045) - Operation not supported on socket.");
  166. case WSAEPFNOSUPPORT: return str.append("WSAEPFNOSUPPORT(10046) - Protocol family not supported.");
  167. case WSAEAFNOSUPPORT: return str.append("WSAEAFNOSUPPORT(10047) - Address family not supported by protocol family.");
  168. case WSAEADDRINUSE: return str.append("WSAEADDRINUSE(10048) - Address already in use.");
  169. case WSAEADDRNOTAVAIL: return str.append("WSAEADDRNOTAVAIL(10049) - Cannot assign requested address.");
  170. case WSAENETDOWN: return str.append("WSAENETDOWN(10050) - Network is down.");
  171. case WSAENETUNREACH: return str.append("WSAENETUNREACH(10051) - Network is unreachable.");
  172. case WSAENETRESET: return str.append("WSAENETRESET(10052) - Network dropped connection on reset.");
  173. case WSAECONNABORTED: return str.append("WSAECONNABORTED(10053) - Software caused connection abort.");
  174. case WSAECONNRESET: return str.append("WSAECONNRESET(10054) - Connection reset by peer.");
  175. case WSAENOBUFS: return str.append("WSAENOBUFS(10055) - No buffer space available.");
  176. case WSAEISCONN: return str.append("WSAEISCONN(10056) - Socket is already connected.");
  177. case WSAENOTCONN: return str.append("WSAENOTCONN(10057) - Socket is not connected.");
  178. case WSAESHUTDOWN: return str.append("WSAESHUTDOWN(10058) - Cannot send after socket shutdown.");
  179. case WSAETOOMANYREFS: return str.append("WSAETOOMANYREFS(10059) - Too many references: cannot splice.");
  180. case WSAETIMEDOUT: return str.append("WSAETIMEDOUT(10060) - Connection timed out.");
  181. case WSAECONNREFUSED: return str.append("WSAECONNREFUSED(10061) - Connection refused.");
  182. case WSAELOOP: return str.append("WSAELOOP(10062) - Too many levels of symbolic links.");
  183. case WSAENAMETOOLONG: return str.append("WSAENAMETOOLONG(10063) - File name too long.");
  184. case WSAEHOSTDOWN: return str.append("WSAEHOSTDOWN(10064) - Host is down.");
  185. case WSAEHOSTUNREACH: return str.append("WSAEHOSTUNREACH(10065) - No route to host.");
  186. case WSASYSNOTREADY: return str.append("WSASYSNOTREADY(10091) - The network subsystem is unusable.");
  187. case WSAVERNOTSUPPORTED: return str.append("WSAVERNOTSUPPORTED(10092) - The Windows Sockets DLL cannot support this application.");
  188. case WSANOTINITIALISED: return str.append("WSANOTINITIALISED(10093) - Winsock not initialized.");
  189. case WSAEDISCON: return str.append("WSAEDISCON(10101) - Disconnect.");
  190. case WSAHOST_NOT_FOUND: return str.append("WSAHOST_NOT_FOUND(11001) - Host not found.");
  191. case WSATRY_AGAIN: return str.append("WSATRY_AGAIN(11002) - Nonauthoritative host not found.");
  192. case WSANO_RECOVERY: return str.append("WSANO_RECOVERY(11003) - Nonrecoverable error.");
  193. case WSANO_DATA: return str.append("WSANO_DATA(11004) - Valid name, no data record of requested type.");
  194. #else
  195. case ENOTSOCK: return str.append("ENOTSOCK - Socket operation on non-socket ");
  196. case EDESTADDRREQ: return str.append("EDESTADDRREQ - Destination address required ");
  197. case EMSGSIZE: return str.append("EMSGSIZE - Message too long ");
  198. case EPROTOTYPE: return str.append("EPROTOTYPE - Protocol wrong type for socket ");
  199. case ENOPROTOOPT: return str.append("ENOPROTOOPT - Protocol not available ");
  200. case EPROTONOSUPPORT: return str.append("EPROTONOSUPPORT - Protocol not supported ");
  201. case ESOCKTNOSUPPORT: return str.append("ESOCKTNOSUPPORT - Socket type not supported ");
  202. case EOPNOTSUPP: return str.append("EOPNOTSUPP - Operation not supported on socket ");
  203. case EPFNOSUPPORT: return str.append("EPFNOSUPPORT - Protocol family not supported ");
  204. case EAFNOSUPPORT: return str.append("EAFNOSUPPORT - Address family not supported by protocol family ");
  205. case EADDRINUSE: return str.append("EADDRINUSE - Address already in use ");
  206. case EADDRNOTAVAIL: return str.append("EADDRNOTAVAIL - Can't assign requested address ");
  207. case ENETDOWN: return str.append("ENETDOWN - Network is down ");
  208. case ENETUNREACH: return str.append("ENETUNREACH - Network is unreachable ");
  209. case ENETRESET: return str.append("ENETRESET - Network dropped connection because of reset ");
  210. case ECONNABORTED: return str.append("ECONNABORTED - Software caused connection abort ");
  211. case ECONNRESET: return str.append("ECONNRESET - Connection reset by peer ");
  212. case ENOBUFS: return str.append("ENOBUFS - No buffer space available ");
  213. case EISCONN: return str.append("EISCONN - Socket is already connected ");
  214. case ENOTCONN: return str.append("ENOTCONN - Socket is not connected ");
  215. case ESHUTDOWN: return str.append("ESHUTDOWN - Can't send after socket shutdown ");
  216. case ETOOMANYREFS: return str.append("ETOOMANYREFS - Too many references: can't splice ");
  217. case ETIMEDOUT: return str.append("ETIMEDOUT - Connection timed out ");
  218. case ECONNREFUSED: return str.append("ECONNREFUSED - Connection refused ");
  219. case EHOSTDOWN: return str.append("EHOSTDOWN - Host is down ");
  220. case EHOSTUNREACH: return str.append("EHOSTUNREACH - No route to host ");
  221. case EWOULDBLOCK: return str.append("EWOULDBLOCK - operation already in progress");
  222. case EINPROGRESS: return str.append("EINPROGRESS - operation now in progress ");
  223. #endif
  224. }
  225. IException *ose = makeOsException(err);
  226. ose->errorMessage(str);
  227. ose->Release();
  228. return str;
  229. }
  230. StringBuffer & errorMessage(StringBuffer &str) const
  231. {
  232. if (msg)
  233. return geterrormessage(errcode,str).append('\n').append(msg);
  234. return geterrormessage(errcode,str);
  235. }
  236. MessageAudience errorAudience() const
  237. {
  238. switch (errcode) {
  239. case JSOCKERR_port_in_use:
  240. return MSGAUD_operator;
  241. }
  242. return MSGAUD_user;
  243. }
  244. private:
  245. int errcode;
  246. char *msg;
  247. };
  248. IJSOCK_Exception *IPv6NotImplementedException(const char *filename,unsigned lineno)
  249. {
  250. StringBuffer msg;
  251. msg.appendf("%s(%d)",filename,lineno);
  252. return new SocketException(JSOCKERR_ipv6_not_implemented,msg.str());
  253. }
  254. struct MCASTREQ
  255. {
  256. struct in_addr imr_multiaddr; /* multicast group to join */
  257. struct in_addr imr_interface; /* interface to join on */
  258. MCASTREQ(const char *mcip)
  259. {
  260. imr_multiaddr.s_addr = inet_addr(mcip);
  261. imr_interface.s_addr = htonl(INADDR_ANY);
  262. }
  263. };
  264. #ifdef __APPLE__
  265. #ifndef MSG_NOSIGNAL
  266. #define MSG_NOSIGNAL 0x4000
  267. #endif
  268. #endif
  269. #if defined( _WIN32)
  270. #define T_SOCKET SOCKET
  271. #define T_FD_SET fd_set
  272. #define XFD_SETSIZE FD_SETSIZE
  273. //Following are defined in more modern headers
  274. #define XFD_ZERO(s) FD_ZERO(s)
  275. #define SEND_FLAGS 0
  276. #define CHECKSOCKRANGE(s)
  277. #define _USE_SELECT // Windows bug 309411 - WSAPoll does not report failed connections - wont fix
  278. // #define poll(a, b, c) WSAPoll((a), (b), (c))
  279. #elif defined(__FreeBSD__) || defined(__APPLE__)
  280. #define XFD_SETSIZE FD_SETSIZE
  281. #define T_FD_SET fd_set
  282. #define XFD_ZERO(s) FD_ZERO(s)
  283. #define T_SOCKET int
  284. #define SEND_FLAGS (MSG_NOSIGNAL)
  285. #define CHECKSOCKRANGE(s)
  286. #else
  287. #define XFD_SETSIZE 32768
  288. struct xfd_set { __fd_mask fds_bits[XFD_SETSIZE / __NFDBITS]; }; // define our own
  289. // linux 64 bit
  290. #ifdef __linux__
  291. #ifdef __64BIT__
  292. #undef __FDMASK
  293. #define __FDMASK(d) (1UL << ((d) % __NFDBITS))
  294. #undef __FDELT
  295. #define __FDELT(d) ((d) / __NFDBITS)
  296. #undef __FD_SET
  297. #define __FD_SET(d, s) (__FDS_BITS (s)[__FDELT(d)] |= __FDMASK(d))
  298. #undef __FD_ISSET
  299. #define __FD_ISSET(d, s) ((__FDS_BITS (s)[__FDELT(d)] & __FDMASK(d)) != 0)
  300. #endif
  301. #define CHECKSOCKRANGE(s) { if (s>=XFD_SETSIZE) THROWJSOCKEXCEPTION2(JSOCKERR_handle_too_large); }
  302. #endif
  303. // end 64 bit
  304. #define T_FD_SET xfd_set
  305. #define XFD_ZERO(s) memset(s,0,sizeof(xfd_set))
  306. #define T_SOCKET int
  307. #define SEND_FLAGS (MSG_NOSIGNAL)
  308. #endif
  309. #ifdef CENTRAL_NODE_RANDOM_DELAY
  310. static SocketEndpointArray CentralNodeArray;
  311. #endif
  312. enum SOCKETMODE { sm_tcp_server, sm_tcp, sm_udp_server, sm_udp, sm_multicast_server, sm_multicast};
  313. #define BADSOCKERR(err) ((err==JSE_BADF)||(err==JSE_NOTSOCK))
  314. class CSocket: public ISocket, public CInterface
  315. {
  316. public:
  317. IMPLEMENT_IINTERFACE;
  318. static CriticalSection crit;
  319. protected:
  320. friend class CSocketConnectWait;
  321. enum { ss_open, ss_shutdown, ss_close, ss_pre_open } state;
  322. T_SOCKET sock;
  323. char* hostname; // host address
  324. unsigned short hostport; // host port
  325. unsigned short localPort;
  326. SOCKETMODE sockmode;
  327. IpAddress targetip;
  328. SocketEndpoint returnep; // set by set_return_addr
  329. MCASTREQ * mcastreq;
  330. size32_t nextblocksize;
  331. unsigned blockflags;
  332. unsigned blocktimeoutms;
  333. bool owned;
  334. enum {accept_not_cancelled, accept_cancel_pending, accept_cancelled} accept_cancel_state;
  335. bool in_accept;
  336. bool nonblocking;
  337. bool nagling;
  338. static unsigned connectingcount;
  339. #ifdef USERECVSEM
  340. static Semaphore receiveblocksem;
  341. bool receiveblocksemowned; // owned by this socket
  342. #endif
  343. #ifdef _TRACE
  344. char * tracename;
  345. #endif
  346. public:
  347. void open(int listen_queue_size,bool reuseports=false);
  348. bool connect_timeout( unsigned timeout, bool noexception);
  349. void connect_wait( unsigned timems);
  350. void udpconnect();
  351. void read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,unsigned timeoutsecs);
  352. void readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timedelaysecs);
  353. void read(void* buf, size32_t size);
  354. size32_t write(void const* buf, size32_t size);
  355. size32_t writetms(void const* buf, size32_t size, unsigned timeoutms=WAIT_FOREVER);
  356. size32_t write_multiple(unsigned num,void const**buf, size32_t *size);
  357. size32_t udp_write_to(const SocketEndpoint &ep,void const* buf, size32_t size);
  358. void close();
  359. void errclose();
  360. bool connectionless() { return (sockmode!=sm_tcp)&&(sockmode!=sm_tcp_server); }
  361. void shutdown(unsigned mode=SHUTDOWN_READWRITE);
  362. ISocket* accept(bool allowcancel);
  363. int wait_read(unsigned timeout);
  364. void logPollError(unsigned revents, const char *rwstr);
  365. int wait_write(unsigned timeout);
  366. int name(char *name,size32_t namemax);
  367. int peer_name(char *name,size32_t namemax);
  368. SocketEndpoint &getPeerEndpoint(SocketEndpoint &ep);
  369. IpAddress & getPeerAddress(IpAddress &addr);
  370. SocketEndpoint &getEndpoint(SocketEndpoint &ep) const override;
  371. void set_return_addr(int port,const char *name); // sets returnep
  372. void cancel_accept();
  373. size32_t get_max_send_size();
  374. bool set_nonblock(bool on=true);
  375. bool set_nagle(bool on);
  376. void set_linger(int lingersecs);
  377. void set_keep_alive(bool set);
  378. void logConnectionInfo(unsigned timeoutms, unsigned conn_mstime);
  379. virtual void set_inherit(bool inherit=false);
  380. virtual bool check_connection();
  381. virtual bool isSecure() const override;
  382. // Block functions
  383. void set_block_mode(unsigned flags,size32_t recsize=0,unsigned timeoutms=0);
  384. bool send_block(const void *blk,size32_t sz);
  385. size32_t receive_block_size();
  386. size32_t receive_block(void *blk,size32_t sz);
  387. size32_t get_send_buffer_size();
  388. void set_send_buffer_size(size32_t sz);
  389. bool join_multicast_group(SocketEndpoint &ep); // for udp multicast
  390. bool leave_multicast_group(SocketEndpoint &ep); // for udp multicast
  391. void set_ttl(unsigned _ttl);
  392. size32_t get_receive_buffer_size();
  393. void set_receive_buffer_size(size32_t sz);
  394. size32_t avail_read();
  395. int pre_connect(bool block);
  396. int post_connect();
  397. void setTraceName(const char * prefix, const char * name);
  398. void setTraceName();
  399. CSocket(const SocketEndpoint &_ep,SOCKETMODE smode,const char *name);
  400. CSocket(T_SOCKET new_sock,SOCKETMODE smode,bool _owned);
  401. virtual ~CSocket();
  402. unsigned OShandle()
  403. {
  404. return (unsigned)sock;
  405. }
  406. private:
  407. int closesock()
  408. {
  409. if (sock!=INVALID_SOCKET) {
  410. T_SOCKET s = sock;
  411. sock = INVALID_SOCKET;
  412. STATS.activesockets--;
  413. #ifdef SOCKTRACE
  414. PROGLOG("SOCKTRACE: Closing socket %x %d (%p)", s, s, this);
  415. #endif
  416. #ifdef _WIN32
  417. return ::closesocket(s);
  418. #else
  419. return ::close(s);
  420. #endif
  421. }
  422. else
  423. return 0;
  424. }
  425. };
  426. CriticalSection CSocket::crit;
  427. unsigned CSocket::connectingcount=0;
  428. #ifdef USERECVSEM
  429. Semaphore CSocket::receiveblocksem(2);
  430. #endif
  431. #ifdef _WIN32
  432. class win_socket_library
  433. {
  434. static bool initdone; // to prevent dependancy probs very early on (e.g. jlog)
  435. public:
  436. win_socket_library() { init(); }
  437. bool init()
  438. {
  439. if (initdone)
  440. return true;
  441. WSADATA wsa;
  442. if (WSAStartup(MAKEWORD(2, 2), &wsa) != 0) {
  443. if (WSAStartup(MAKEWORD(1, 1), &wsa) != 0) {
  444. MessageBox(NULL,"Failed to initialize windows sockets","JLib Socket Error",MB_OK);
  445. return false;
  446. }
  447. }
  448. initdone = true;
  449. return true;
  450. }
  451. ~win_socket_library()
  452. {
  453. WSACleanup();
  454. }
  455. };
  456. bool win_socket_library::initdone = false;
  457. static win_socket_library ws32_lib;
  458. #define ERRNO() WSAGetLastError()
  459. #define JSE_ADDRINUSE WSAEADDRINUSE
  460. #define JSE_CONNRESET WSAECONNRESET
  461. #define JSE_CONNABORTED WSAECONNABORTED
  462. #define JSE_NOTCONN WSAENOTCONN
  463. #define JSE_WOULDBLOCK WSAEWOULDBLOCK
  464. #define JSE_INPROGRESS WSAEINPROGRESS
  465. #define JSE_NETUNREACH WSAENETUNREACH
  466. #define JSE_NOTSOCK WSAENOTSOCK
  467. #define JSE_TIMEDOUT WSAETIMEDOUT
  468. #define JSE_CONNREFUSED WSAECONNREFUSED
  469. #define JSE_BADF WSAEBADF
  470. #define JSE_INTR WSAEINTR
  471. struct j_sockaddr_in6 {
  472. short sin6_family; /* AF_INET6 */
  473. u_short sin6_port; /* Transport level port number */
  474. u_long sin6_flowinfo; /* IPv6 flow information */
  475. struct in_addr6 sin6_addr; /* IPv6 address */
  476. u_long sin6_scope_id; /* set of interfaces for a scope */
  477. };
  478. typedef union {
  479. struct sockaddr sa;
  480. struct j_sockaddr_in6 sin6;
  481. struct sockaddr_in sin;
  482. } J_SOCKADDR;
  483. #define DEFINE_SOCKADDR(name) J_SOCKADDR name; memset(&name,0,sizeof(J_SOCKADDR))
  484. static int _inet_pton(int af, const char* src, void* dst)
  485. {
  486. DEFINE_SOCKADDR(u);
  487. int address_length;
  488. switch (af) {
  489. case AF_INET:
  490. u.sin.sin_family = AF_INET;
  491. address_length = sizeof (u.sin);
  492. break;
  493. case AF_INET6:
  494. u.sin6.sin6_family = AF_INET6;
  495. address_length = sizeof (u.sin6);
  496. break;
  497. default:
  498. #ifdef EAFNOSUPPORT
  499. errno = EAFNOSUPPORT;
  500. #else
  501. errno = 52;
  502. #endif
  503. return -1;
  504. }
  505. ws32_lib.init();
  506. int ret = WSAStringToAddress ((LPTSTR) src, af, NULL, &u.sa, &address_length);
  507. if (ret == 0) {
  508. switch (af) {
  509. case AF_INET:
  510. memcpy (dst, &u.sin.sin_addr, sizeof (struct in_addr));
  511. break;
  512. case AF_INET6:
  513. memcpy (dst, &u.sin6.sin6_addr, sizeof (u.sin6.sin6_addr));
  514. break;
  515. }
  516. return 1;
  517. }
  518. errno = WSAGetLastError();
  519. // PROGLOG("errno = %d",errno);
  520. return 0;
  521. }
  522. static const char * _inet_ntop (int af, const void *src, char *dst, socklen_t cnt)
  523. {
  524. /* struct sockaddr can't accomodate struct sockaddr_in6. */
  525. DEFINE_SOCKADDR(u);
  526. DWORD dstlen = cnt;
  527. size_t srcsize;
  528. memset(&u,0,sizeof(u));
  529. switch (af) {
  530. case AF_INET:
  531. u.sin.sin_family = AF_INET;
  532. u.sin.sin_addr = *(struct in_addr *) src;
  533. srcsize = sizeof (u.sin);
  534. break;
  535. case AF_INET6:
  536. u.sin6.sin6_family = AF_INET6;
  537. memcpy(&u.sin6.sin6_addr,src,sizeof(in_addr6));
  538. srcsize = sizeof (u.sin6);
  539. break;
  540. default:
  541. return NULL;
  542. }
  543. ws32_lib.init();
  544. if (WSAAddressToString (&u.sa, srcsize, NULL, dst, &dstlen) != 0) {
  545. errno = WSAGetLastError();
  546. return NULL;
  547. }
  548. return (const char *) dst;
  549. }
  550. int inet_aton (const char *name, struct in_addr *addr)
  551. {
  552. addr->s_addr = inet_addr (name);
  553. return (addr->s_addr == (u_long)-1)?1:0; // 255.255.255.255 has had it here
  554. }
  555. #else
  556. #define JSE_ADDRINUSE EADDRINUSE
  557. #define JSE_CONNRESET ECONNRESET
  558. #define JSE_CONNABORTED ECONNABORTED
  559. #define JSE_NOTCONN ENOTCONN
  560. #define JSE_WOULDBLOCK EWOULDBLOCK
  561. #define JSE_INPROGRESS EINPROGRESS
  562. #define JSE_NETUNREACH ENETUNREACH
  563. #define JSE_NOTSOCK ENOTSOCK
  564. #define JSE_TIMEDOUT ETIMEDOUT
  565. #define JSE_CONNREFUSED ECONNREFUSED
  566. #define JSE_BADF EBADF
  567. #define _inet_ntop inet_ntop
  568. #define _inet_pton inet_pton
  569. #define in_addr6 in6_addr
  570. typedef union {
  571. struct sockaddr sa;
  572. struct sockaddr_in6 sin6;
  573. struct sockaddr_in sin;
  574. } J_SOCKADDR;
  575. #define DEFINE_SOCKADDR(name) J_SOCKADDR name; memset(&name,0,sizeof(J_SOCKADDR))
  576. #define JSE_INTR EINTR
  577. #define ERRNO() (errno)
  578. #ifndef INADDR_NONE
  579. #define INADDR_NONE (-1)
  580. #endif
  581. #endif
  582. #ifndef INET6_ADDRSTRLEN
  583. #define INET6_ADDRSTRLEN 65
  584. #endif
  585. extern jlib_decl void throwJSocketException(int jsockErr)
  586. {
  587. THROWJSOCKEXCEPTION2(jsockErr);
  588. }
  589. inline void LogErr(unsigned err,unsigned ref,const char *info,unsigned lineno,const char *tracename)
  590. {
  591. if (err)
  592. {
  593. PROGLOG("jsocket(%d,%d)%s%s err = %d%s%s",ref,lineno,
  594. (info&&*info)?" ":"",(info&&*info)?info:"",err,
  595. (tracename&&*tracename)?" : ":"",(tracename&&*tracename)?tracename:"");
  596. #ifdef _TRACELINKCLOSED
  597. if ((JSE_NOTCONN == err) || (JSE_CONNRESET == err) || (JSE_CONNABORTED == err))
  598. {
  599. PROGLOG("Socket not connected, stack:");
  600. PrintStackReport();
  601. }
  602. #endif
  603. }
  604. }
  605. inline socklen_t setSockAddr(J_SOCKADDR &u, const IpAddress &ip,unsigned short port)
  606. {
  607. if (!IP6preferred) {
  608. if (ip.getNetAddress(sizeof(in_addr),&u.sin.sin_addr)==sizeof(in_addr)) {
  609. u.sin.sin_family = AF_INET;
  610. u.sin.sin_port = htons(port);
  611. return sizeof(u.sin);
  612. }
  613. }
  614. if (IP4only)
  615. IPV6_NOT_IMPLEMENTED();
  616. ip.getNetAddress(sizeof(in_addr6),&u.sin6.sin6_addr);
  617. u.sin6.sin6_family = AF_INET6;
  618. u.sin6.sin6_port = htons(port);
  619. return sizeof(u.sin6);
  620. }
  621. inline socklen_t setSockAddrAny(J_SOCKADDR &u, unsigned short port)
  622. {
  623. if (IP6preferred) {
  624. #ifdef _WIN32
  625. IN6ADDR_SETANY((PSOCKADDR_IN6)&u.sin6.sin6_addr);
  626. #else
  627. memcpy(&u.sin6.sin6_addr,&in6addr_any,sizeof(in_addr6));
  628. #endif
  629. u.sin6.sin6_family= AF_INET6;
  630. u.sin6.sin6_port = htons(port);
  631. return sizeof(u.sin6);
  632. }
  633. u.sin.sin_addr.s_addr = htonl(INADDR_ANY);
  634. u.sin.sin_family= AF_INET;
  635. u.sin.sin_port = htons(port);
  636. return sizeof(u.sin);
  637. }
  638. inline void getSockAddrEndpoint(const J_SOCKADDR &u, socklen_t ul, SocketEndpoint &ep)
  639. {
  640. if (ul==sizeof(u.sin)) {
  641. ep.setNetAddress(sizeof(in_addr),&u.sin.sin_addr);
  642. ep.port = htons(u.sin.sin_port);
  643. }
  644. else {
  645. ep.setNetAddress(sizeof(in_addr6),&u.sin6.sin6_addr);
  646. ep.port = htons(u.sin6.sin6_port);
  647. }
  648. }
  649. /* might need fcntl(F_SETFL), or ioctl(FIONBIO) */
  650. /* Posix.1g says fcntl */
  651. #if defined(O_NONBLOCK)
  652. bool CSocket::set_nonblock(bool on)
  653. {
  654. int flags = fcntl(sock, F_GETFL, 0);
  655. if (flags == -1)
  656. return nonblocking;
  657. if (on)
  658. flags |= O_NONBLOCK;
  659. else
  660. flags &= ~O_NONBLOCK;
  661. if (fcntl(sock, F_SETFL, flags)==0) {
  662. bool wasNonBlocking = nonblocking;
  663. nonblocking = on;
  664. return wasNonBlocking;
  665. }
  666. return nonblocking;
  667. }
  668. #else
  669. bool CSocket::set_nonblock(bool on)
  670. {
  671. #ifdef _WIN32
  672. u_long yes = on?1:0;
  673. if (ioctlsocket(sock, FIONBIO, &yes)==0) {
  674. #else
  675. int yes = on?1:0;
  676. if (ioctl(sock, FIONBIO, &yes)==0) {
  677. #endif
  678. bool wasNonBlocking = nonblocking;
  679. nonblocking = on;
  680. return wasNonBlocking;
  681. }
  682. return nonblocking;
  683. }
  684. #endif
  685. bool CSocket::set_nagle(bool on)
  686. {
  687. bool ret = nagling;
  688. nagling = on;
  689. int enabled = !on;
  690. if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char*)&enabled, sizeof(enabled)) != 0) {
  691. nagling = !on;
  692. }
  693. return ret;
  694. }
  695. void CSocket::set_inherit(bool inherit)
  696. {
  697. #ifndef _WIN32
  698. long flag = fcntl(sock, F_GETFD);
  699. if(inherit)
  700. flag &= ~FD_CLOEXEC;
  701. else
  702. flag |= FD_CLOEXEC;
  703. fcntl(sock, F_SETFD, flag);
  704. #endif
  705. }
  706. size32_t CSocket::avail_read()
  707. {
  708. #ifdef _WIN32
  709. u_long avail;
  710. if (ioctlsocket(sock, FIONREAD, &avail)==0)
  711. #else
  712. int avail;
  713. if (ioctl(sock, FIONREAD, &avail)==0)
  714. #endif
  715. return (size32_t)avail;
  716. int err = ERRNO();
  717. LOGERR2(err,1,"avail_read");
  718. return 0;
  719. }
  720. #define PRE_CONN_UNREACH_ELIM 100
  721. int CSocket::pre_connect (bool block)
  722. {
  723. if (NULL == hostname || '\0' == (*hostname))
  724. {
  725. StringBuffer err;
  726. err.appendf("CSocket::pre_connect - Invalid/missing host IP address raised in : %s, line %d",sanitizeSourceFile(__FILE__), __LINE__);
  727. IJSOCK_Exception *e = new SocketException(JSOCKERR_bad_netaddr,err.str());
  728. throw e;
  729. }
  730. DEFINE_SOCKADDR(u);
  731. if (targetip.isNull()) {
  732. set_return_addr(hostport,hostname);
  733. targetip.ipset(returnep);
  734. }
  735. socklen_t ul = setSockAddr(u,targetip,hostport);
  736. sock = ::socket(u.sa.sa_family, SOCK_STREAM, targetip.isIp4()?0:PF_INET6);
  737. owned = true;
  738. state = ss_pre_open; // will be set to open by post_connect
  739. if (sock == INVALID_SOCKET) {
  740. int err = ERRNO();
  741. THROWJSOCKEXCEPTION(err);
  742. }
  743. STATS.activesockets++;
  744. int err = 0;
  745. set_nonblock(!block);
  746. int rc = ::connect(sock, &u.sa, ul);
  747. if (rc==SOCKET_ERROR) {
  748. err = ERRNO();
  749. if ((err != JSE_INPROGRESS)&&(err != JSE_WOULDBLOCK)&&(err != JSE_TIMEDOUT)&&(err!=JSE_CONNREFUSED)) { // handled by caller
  750. if (err != JSE_NETUNREACH) {
  751. pre_conn_unreach_cnt.store(0);
  752. LOGERR2(err,1,"pre_connect");
  753. } else {
  754. int ecnt = pre_conn_unreach_cnt.load();
  755. if (ecnt <= PRE_CONN_UNREACH_ELIM) {
  756. pre_conn_unreach_cnt.fetch_add(1);
  757. LOGERR2(err,1,"pre_connect network unreachable");
  758. }
  759. }
  760. } else
  761. pre_conn_unreach_cnt.store(0);
  762. } else
  763. pre_conn_unreach_cnt.store(0);
  764. #ifdef SOCKTRACE
  765. PROGLOG("SOCKTRACE: pre-connected socket%s %x %d (%p) err=%d", block?"(block)":"", sock, sock, this, err);
  766. #endif
  767. return err;
  768. }
  769. int CSocket::post_connect ()
  770. {
  771. set_nonblock(false);
  772. int err = 0;
  773. socklen_t errlen = sizeof(err);
  774. int rc = getsockopt(sock, SOL_SOCKET, SO_ERROR, (char *)&err, &errlen); // check for error
  775. if ((rc!=0)&&!err)
  776. err = ERRNO(); // some implementations of getsockopt duff
  777. if (err==0) {
  778. nagling = true;
  779. set_nagle(false);
  780. state = ss_open;
  781. SocketEndpoint ep;
  782. localPort = getEndpoint(ep).port;
  783. }
  784. else if ((err!=JSE_TIMEDOUT)&&(err!=JSE_CONNREFUSED)) // handled by caller
  785. LOGERR2(err,1,"post_connect");
  786. return err;
  787. }
  788. void CSocket::open(int listen_queue_size,bool reuseports)
  789. {
  790. if (IP6preferred)
  791. sock = ::socket(AF_INET6, connectionless()?SOCK_DGRAM:SOCK_STREAM, PF_INET6);
  792. else
  793. sock = ::socket(AF_INET, connectionless()?SOCK_DGRAM:SOCK_STREAM, 0);
  794. if (sock == INVALID_SOCKET) {
  795. THROWJSOCKEXCEPTION(ERRNO());
  796. }
  797. STATS.activesockets++;
  798. #ifdef SOCKTRACE
  799. PROGLOG("SOCKTRACE: opened socket %x %d (%p)", sock,sock,this);
  800. #endif
  801. if ((hostport==0)&&(sockmode==sm_udp)) {
  802. state = ss_open;
  803. #ifdef SOCKTRACE
  804. PROGLOG("SOCKTRACE: opened socket return udp");
  805. #endif
  806. set_inherit(false);
  807. return;
  808. }
  809. #ifndef _WIN32
  810. reuseports = true; // for some reason linux requires reuse ports
  811. #endif
  812. if (reuseports) {
  813. int on = 1;
  814. setsockopt( sock, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on));
  815. }
  816. DEFINE_SOCKADDR(u);
  817. socklen_t ul;
  818. if (hostname) {
  819. if (targetip.isNull()) {
  820. set_return_addr(hostport,hostname);
  821. targetip.ipset(returnep);
  822. }
  823. ul = setSockAddr(u,targetip,hostport);
  824. }
  825. else
  826. ul = setSockAddrAny(u,hostport);
  827. int saverr;
  828. if (::bind(sock, &u.sa, ul) != 0) {
  829. saverr = ERRNO();
  830. if (saverr==JSE_ADDRINUSE) { // don't log as error (some usages probe ports)
  831. ErrPortInUse:
  832. closesock();
  833. char msg[1024];
  834. sprintf(msg,"Target: %s, port = %d, Raised in: %s, line %d",tracename,(int)hostport,sanitizeSourceFile(__FILE__), __LINE__);
  835. IJSOCK_Exception *e = new SocketException(JSOCKERR_port_in_use,msg);
  836. throw e;
  837. }
  838. else {
  839. closesock();
  840. THROWJSOCKEXCEPTION(saverr);
  841. }
  842. }
  843. if (!connectionless()) {
  844. if (::listen(sock, listen_queue_size) != 0) {
  845. saverr = ERRNO();
  846. if (saverr==JSE_ADDRINUSE)
  847. goto ErrPortInUse;
  848. closesock();
  849. THROWJSOCKEXCEPTION(saverr);
  850. }
  851. }
  852. if (mcastreq) {
  853. if (setsockopt(sock, IPPROTO_IP, IP_ADD_MEMBERSHIP,(char*)mcastreq, sizeof(*mcastreq))!=0) {
  854. saverr = ERRNO();
  855. closesock();
  856. THROWJSOCKEXCEPTION(saverr);
  857. }
  858. }
  859. state = ss_open;
  860. #ifdef SOCKTRACE
  861. PROGLOG("SOCKTRACE: opened socket return");
  862. #endif
  863. set_inherit(false);
  864. }
  865. ISocket* CSocket::accept(bool allowcancel)
  866. {
  867. if ((accept_cancel_state!=accept_not_cancelled) && allowcancel) {
  868. accept_cancel_state=accept_cancelled;
  869. return NULL;
  870. }
  871. if (state != ss_open) {
  872. ERRLOG("invalid accept, state = %d",(int)state);
  873. THROWJSOCKEXCEPTION(JSOCKERR_not_opened);
  874. }
  875. if (connectionless()) {
  876. THROWJSOCKEXCEPTION(JSOCKERR_connectionless_socket);
  877. }
  878. T_SOCKET newsock;
  879. for (;;) {
  880. in_accept = true;
  881. newsock = (sock!=INVALID_SOCKET)?::accept(sock, NULL, NULL):INVALID_SOCKET;
  882. in_accept = false;
  883. #ifdef SOCKTRACE
  884. PROGLOG("SOCKTRACE: accept created socket %x %d (%p)", newsock,newsock,this);
  885. #endif
  886. if (newsock!=INVALID_SOCKET) {
  887. if ((sock==INVALID_SOCKET)||(accept_cancel_state==accept_cancel_pending)) {
  888. ::close(newsock);
  889. newsock=INVALID_SOCKET;
  890. }
  891. else {
  892. accept_cancel_state = accept_not_cancelled;
  893. break;
  894. }
  895. }
  896. int saverr;
  897. saverr = ERRNO();
  898. if ((sock==INVALID_SOCKET)||(accept_cancel_state==accept_cancel_pending)) {
  899. accept_cancel_state = accept_cancelled;
  900. if (allowcancel)
  901. return NULL;
  902. THROWJSOCKEXCEPTION(JSOCKERR_cancel_accept);
  903. }
  904. if (saverr != JSE_INTR) {
  905. accept_cancel_state = accept_not_cancelled;
  906. THROWJSOCKEXCEPTION(saverr);
  907. }
  908. }
  909. if (state != ss_open) {
  910. accept_cancel_state = accept_cancelled;
  911. if (allowcancel)
  912. return NULL;
  913. THROWJSOCKEXCEPTION(JSOCKERR_cancel_accept);
  914. }
  915. CSocket *ret = new CSocket(newsock,sm_tcp,true);
  916. ret->set_inherit(false);
  917. return ret;
  918. }
  919. void CSocket::set_linger(int lingertime)
  920. {
  921. struct linger l;
  922. l.l_onoff = (lingertime>=0)?1:0;
  923. l.l_linger = (lingertime>=0)?lingertime:0;
  924. if (setsockopt(sock, SOL_SOCKET, SO_LINGER, (char*)&l, sizeof(l)) != 0) {
  925. WARNLOG("Linger not set");
  926. }
  927. }
  928. void CSocket::set_keep_alive(bool set)
  929. {
  930. int on=set?1:0;
  931. if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char*)&on, sizeof(on)) != 0) {
  932. WARNLOG("KeepAlive not set");
  933. }
  934. }
  935. int CSocket::name(char *retname,size32_t namemax)
  936. {
  937. SocketEndpoint ep;
  938. getEndpoint(ep);
  939. if (retname && namemax)
  940. {
  941. StringBuffer s;
  942. ep.getIpText(s);
  943. if (namemax-1<s.length())
  944. s.setLength(namemax-1);
  945. memcpy(retname,s.str(),s.length()+1);
  946. }
  947. return ep.port;
  948. }
  949. int CSocket::peer_name(char *retname,size32_t namemax)
  950. {
  951. // should not raise exceptions
  952. int ret = 0;
  953. if (!retname)
  954. namemax = 0;
  955. if (namemax)
  956. retname[0] = 0;
  957. if (state != ss_open) {
  958. return -1; // don't log as used to test socket
  959. }
  960. StringBuffer s;
  961. if (sockmode==sm_udp_server) { // udp server
  962. returnep.getIpText(s);
  963. ret = returnep.port;
  964. }
  965. else {
  966. DEFINE_SOCKADDR(u);
  967. socklen_t ul = sizeof(u);
  968. if (::getpeername(sock,&u.sa, &ul)<0)
  969. return -1; // don't log as used to test socket
  970. SocketEndpoint ep;
  971. getSockAddrEndpoint(u,ul,ep);
  972. ep.getIpText(s);
  973. ret = ep.port;
  974. }
  975. if (namemax>1) {
  976. if (namemax-1<s.length())
  977. s.setLength(namemax-1);
  978. memcpy(retname,s.str(),s.length()+1);
  979. }
  980. return ret;
  981. }
  982. SocketEndpoint &CSocket::getPeerEndpoint(SocketEndpoint &ep)
  983. {
  984. if (state != ss_open) {
  985. THROWJSOCKEXCEPTION(JSOCKERR_not_opened);
  986. }
  987. StringBuffer s;
  988. if (sockmode==sm_udp_server) { // udp server
  989. ep.set(returnep);
  990. }
  991. else {
  992. DEFINE_SOCKADDR(u);
  993. socklen_t ul = sizeof(u);
  994. if (::getpeername(sock,&u.sa, &ul)<0) {
  995. DBGLOG("getpeername failed %d",ERRNO());
  996. ep.set(NULL, 0);
  997. }
  998. else
  999. getSockAddrEndpoint(u,ul,ep);
  1000. }
  1001. return ep;
  1002. }
  1003. IpAddress & CSocket::getPeerAddress(IpAddress &addr)
  1004. {
  1005. SocketEndpoint ep;
  1006. getPeerEndpoint(ep);
  1007. addr = ep;
  1008. return addr;
  1009. }
  1010. void CSocket::set_return_addr(int port,const char *retname)
  1011. {
  1012. if (!returnep.ipset(retname)) {
  1013. IJSOCK_Exception *e = new SocketException(JSOCKERR_bad_address); // don't use THROWJSOCKEXCEPTION here
  1014. throw e;
  1015. }
  1016. returnep.port = port;
  1017. }
  1018. SocketEndpoint &CSocket::getEndpoint(SocketEndpoint &ep) const
  1019. {
  1020. if (state != ss_open) {
  1021. THROWJSOCKEXCEPTION(JSOCKERR_not_opened);
  1022. }
  1023. DEFINE_SOCKADDR(u);
  1024. socklen_t ul = sizeof(u);
  1025. if (::getsockname(sock,&u.sa, &ul)<0) {
  1026. THROWJSOCKEXCEPTION(ERRNO());
  1027. }
  1028. getSockAddrEndpoint(u,ul,ep);
  1029. return ep;
  1030. }
  1031. void CSocket::cancel_accept()
  1032. {
  1033. if (connectionless()) {
  1034. THROWJSOCKEXCEPTION(JSOCKERR_connectionless_socket);
  1035. }
  1036. #ifdef SOCKTRACE
  1037. PROGLOG("SOCKTRACE: Cancel accept socket %x %d (%p)", sock, sock, this);
  1038. #endif
  1039. if (!in_accept) {
  1040. accept_cancel_state = accept_cancelled;
  1041. shutdown();
  1042. errclose();
  1043. return;
  1044. }
  1045. accept_cancel_state = accept_cancel_pending;
  1046. errclose(); // this should cause accept to terminate (not supported on all linux though)
  1047. #ifdef _WIN32
  1048. for (unsigned i=0;i<5;i++) { // windows closes on different lower priority thread
  1049. Sleep(i);
  1050. if (accept_cancel_state==accept_cancelled)
  1051. return;
  1052. }
  1053. #else
  1054. Sleep(0);
  1055. if (accept_cancel_state==accept_cancelled)
  1056. return;
  1057. #endif
  1058. // Wakeup listener using a connection
  1059. SocketEndpoint ep(hostport,queryHostIP());
  1060. Owned<CSocket> sock = new CSocket(ep,sm_tcp,NULL);
  1061. try {
  1062. sock->connect_timeout(100,true);
  1063. }
  1064. catch (IJSOCK_Exception *e) {
  1065. EXCLOG(e,"CSocket::cancel_eccept");
  1066. e->Release();
  1067. }
  1068. }
  1069. // ================================================================================
  1070. // connect versions
  1071. ISocket* ISocket::connect( const SocketEndpoint &ep )
  1072. {
  1073. // general connect
  1074. return ISocket::connect_wait(ep,DEFAULT_CONNECT_TIME);
  1075. }
  1076. inline void refused_sleep(CTimeMon &tm, unsigned &refuseddelay)
  1077. {
  1078. unsigned remaining;
  1079. if (!tm.timedout(&remaining)) {
  1080. if (refuseddelay<remaining/4) {
  1081. Sleep(refuseddelay);
  1082. if (refuseddelay<CONNECT_TIMEOUT_REFUSED_WAIT/2)
  1083. refuseddelay *=2;
  1084. else
  1085. refuseddelay = CONNECT_TIMEOUT_REFUSED_WAIT;
  1086. }
  1087. else
  1088. Sleep(remaining/4); // towards end of timeout approach gradually
  1089. }
  1090. }
  1091. bool CSocket::connect_timeout( unsigned timeout, bool noexception)
  1092. {
  1093. // simple connect with timeout (no fancy stuff!)
  1094. unsigned startt = usTick();
  1095. CTimeMon tm(timeout);
  1096. unsigned remaining;
  1097. unsigned refuseddelay = 1;
  1098. int err;
  1099. while (!tm.timedout(&remaining))
  1100. {
  1101. err = pre_connect(false);
  1102. if ((err == JSE_INPROGRESS)||(err == JSE_WOULDBLOCK))
  1103. {
  1104. #ifdef _USE_SELECT
  1105. T_FD_SET fds;
  1106. struct timeval tv;
  1107. CHECKSOCKRANGE(sock);
  1108. XFD_ZERO(&fds);
  1109. FD_SET((unsigned)sock, &fds);
  1110. T_FD_SET except;
  1111. XFD_ZERO(&except);
  1112. FD_SET((unsigned)sock, &except);
  1113. tv.tv_sec = remaining / 1000;
  1114. tv.tv_usec = (remaining % 1000)*1000;
  1115. int rc = ::select( sock + 1, NULL, (fd_set *)&fds, (fd_set *)&except, &tv );
  1116. #else
  1117. struct pollfd fds[1];
  1118. fds[0].fd = sock;
  1119. fds[0].events = POLLOUT;
  1120. fds[0].revents = 0;
  1121. int rc = ::poll(fds, 1, remaining);
  1122. #endif
  1123. if (rc>0)
  1124. {
  1125. // select/poll succeeded - return error from socket (0 if connected)
  1126. socklen_t errlen = sizeof(err);
  1127. rc = getsockopt(sock, SOL_SOCKET, SO_ERROR, (char *)&err, &errlen); // check for error
  1128. if ((rc!=0)&&!err)
  1129. err = ERRNO(); // some implementations of getsockopt duff
  1130. if (err) // probably ECONNREFUSED but treat all errors same
  1131. refused_sleep(tm,refuseddelay);
  1132. }
  1133. else if (rc<0)
  1134. {
  1135. err = ERRNO();
  1136. LOGERR2(err,2,"::select/poll");
  1137. }
  1138. }
  1139. if (err==0)
  1140. {
  1141. err = post_connect();
  1142. if (err==0)
  1143. {
  1144. STATS.connects++;
  1145. STATS.connecttime+=usTick()-startt;
  1146. #ifdef _TRACE
  1147. setTraceName();
  1148. #endif
  1149. #ifdef SOCKTRACE
  1150. unsigned conn_mstime = (usTick() - startt) / 1000;
  1151. logConnectionInfo(timeout, conn_mstime);
  1152. #endif
  1153. return true;
  1154. }
  1155. }
  1156. errclose();
  1157. }
  1158. #ifdef SOCKTRACE
  1159. PROGLOG("connect_timeout: failed %d",err);
  1160. #endif
  1161. STATS.failedconnects++;
  1162. STATS.failedconnecttime+=usTick()-startt;
  1163. if (!noexception)
  1164. THROWJSOCKEXCEPTION(JSOCKERR_connection_failed);
  1165. return false;
  1166. }
  1167. ISocket* ISocket::connect_timeout(const SocketEndpoint &ep,unsigned timeout)
  1168. {
  1169. if (ep.isNull()||(ep.port==0))
  1170. THROWJSOCKEXCEPTION2(JSOCKERR_bad_address);
  1171. Owned<CSocket> sock = new CSocket(ep,sm_tcp,NULL);
  1172. sock->connect_timeout(timeout,false);
  1173. return sock.getClear();
  1174. }
  1175. #define POLLTIME 50
  1176. void CSocket::connect_wait(unsigned timems)
  1177. {
  1178. // simple connect with timeout (no fancy stuff!)
  1179. unsigned startt = usTick();
  1180. CTimeMon tm(timems);
  1181. bool exit = false;
  1182. int err;
  1183. unsigned refuseddelay = 1;
  1184. while (!exit) {
  1185. #ifdef CENTRAL_NODE_RANDOM_DELAY
  1186. ForEachItemIn(cn,CentralNodeArray) {
  1187. const SocketEndpoint &ep=CentralNodeArray.item(cn);
  1188. if (ep.ipequals(targetip)) {
  1189. unsigned sleeptime = getRandom() % 1000;
  1190. StringBuffer s;
  1191. ep.getIpText(s);
  1192. PrintLog("Connection to central node %s - sleeping %d milliseconds", s.str(), sleeptime);
  1193. Sleep(sleeptime);
  1194. break;
  1195. }
  1196. }
  1197. #endif
  1198. unsigned remaining;
  1199. exit = tm.timedout(&remaining);
  1200. bool blockselect = exit; // if last time round block
  1201. {
  1202. CriticalBlock block(crit);
  1203. if (++connectingcount>4)
  1204. blockselect = true;
  1205. }
  1206. err = pre_connect(blockselect);
  1207. if (blockselect)
  1208. {
  1209. if (err&&!exit)
  1210. refused_sleep(tm,refuseddelay); // probably ECONNREFUSED but treat all errors same
  1211. }
  1212. else
  1213. {
  1214. unsigned timeoutms = (exit||(remaining<10000))?10000:remaining;
  1215. #ifndef BLOCK_POLLED_SINGLE_CONNECTS
  1216. unsigned polltime = 1;
  1217. #endif
  1218. while (!blockselect && ((err == JSE_INPROGRESS)||(err == JSE_WOULDBLOCK)))
  1219. {
  1220. #ifdef _USE_SELECT
  1221. T_FD_SET fds;
  1222. struct timeval tv;
  1223. CHECKSOCKRANGE(sock);
  1224. XFD_ZERO(&fds);
  1225. FD_SET((unsigned)sock, &fds);
  1226. T_FD_SET except;
  1227. XFD_ZERO(&except);
  1228. FD_SET((unsigned)sock, &except);
  1229. #ifdef BLOCK_POLLED_SINGLE_CONNECTS
  1230. tv.tv_sec = timeoutms / 1000;
  1231. tv.tv_usec = (timeoutms % 1000)*1000;
  1232. #else
  1233. tv.tv_sec = 0;
  1234. tv.tv_usec = 0;
  1235. #endif
  1236. int rc = ::select( sock + 1, NULL, (fd_set *)&fds, (fd_set *)&except, &tv );
  1237. #else
  1238. struct pollfd fds[1];
  1239. fds[0].fd = sock;
  1240. fds[0].events = POLLOUT;
  1241. fds[0].revents = 0;
  1242. #ifdef BLOCK_POLLED_SINGLE_CONNECTS
  1243. int pollTimeOut = timeoutms;
  1244. #else
  1245. int pollTimeOut = 0;
  1246. #endif
  1247. int rc = ::poll(fds, 1, pollTimeOut);
  1248. #endif
  1249. if (rc>0)
  1250. {
  1251. // select/poll succeeded - return error from socket (0 if connected)
  1252. socklen_t errlen = sizeof(err);
  1253. rc = getsockopt(sock, SOL_SOCKET, SO_ERROR, (char *)&err, &errlen); // check for error
  1254. if ((rc!=0)&&!err)
  1255. err = ERRNO(); // some implementations of getsockopt duff
  1256. if (err)
  1257. refused_sleep(tm,refuseddelay); // probably ECONNREFUSED but treat all errors same
  1258. break;
  1259. }
  1260. if (rc<0)
  1261. {
  1262. err = ERRNO();
  1263. LOGERR2(err,2,"::select/poll");
  1264. break;
  1265. }
  1266. if (!timeoutms)
  1267. {
  1268. #ifdef SOCKTRACE
  1269. PROGLOG("connecttimeout: timed out");
  1270. #endif
  1271. err = -1;
  1272. break;
  1273. }
  1274. #ifdef BLOCK_POLLED_SINGLE_CONNECTS
  1275. break;
  1276. #else
  1277. if (timeoutms<polltime)
  1278. polltime = timeoutms;
  1279. Sleep(polltime); // sleeps 1-50ms (to let other threads run)
  1280. timeoutms -= polltime;
  1281. if (polltime>POLLTIME/2)
  1282. polltime = POLLTIME;
  1283. else
  1284. polltime *= 2;
  1285. #endif
  1286. }
  1287. }
  1288. {
  1289. CriticalBlock block(crit);
  1290. --connectingcount;
  1291. }
  1292. if (err==0)
  1293. {
  1294. err = post_connect();
  1295. if (err==0)
  1296. {
  1297. STATS.connects++;
  1298. STATS.connecttime+=usTick()-startt;
  1299. #ifdef _TRACE
  1300. setTraceName();
  1301. #endif
  1302. #ifdef SOCKTRACE
  1303. unsigned conn_mstime = (usTick() - startt) / 1000;
  1304. logConnectionInfo(timems, conn_mstime);
  1305. #endif
  1306. return;
  1307. }
  1308. }
  1309. errclose();
  1310. }
  1311. #ifdef SOCKTRACE
  1312. PROGLOG("connect_wait: failed %d",err);
  1313. #endif
  1314. STATS.failedconnects++;
  1315. STATS.failedconnecttime+=usTick()-startt;
  1316. THROWJSOCKEXCEPTION(JSOCKERR_connection_failed);
  1317. }
  1318. void CSocket::setTraceName(const char * prefix, const char * name)
  1319. {
  1320. #ifdef _TRACE
  1321. StringBuffer peer;
  1322. peer.append(prefix);
  1323. if (state == ss_open)
  1324. peer.append(":").append(localPort).append(" -> ");
  1325. peer.append(name?name:"(NULL)");
  1326. peer.append(":").append(hostport);
  1327. if (sock != INVALID_SOCKET)
  1328. peer.append(" (").append(sock).append(")");
  1329. free(tracename);
  1330. tracename = peer.detach();
  1331. #endif
  1332. }
  1333. void CSocket::setTraceName()
  1334. {
  1335. #ifdef _TRACE
  1336. setTraceName("C!", hostname);
  1337. #endif
  1338. }
  1339. ISocket* ISocket::connect_wait( const SocketEndpoint &ep, unsigned timems)
  1340. {
  1341. if (ep.isNull()||(ep.port==0))
  1342. THROWJSOCKEXCEPTION2(JSOCKERR_bad_address);
  1343. Owned<CSocket> sock = new CSocket(ep,sm_tcp,NULL);
  1344. sock->connect_wait(timems);
  1345. return sock.getClear();
  1346. }
  1347. void CSocket::udpconnect()
  1348. {
  1349. DEFINE_SOCKADDR(u);
  1350. if (targetip.isNull()) {
  1351. set_return_addr(hostport,hostname);
  1352. targetip.ipset(returnep);
  1353. }
  1354. socklen_t ul = setSockAddr(u,targetip,hostport);
  1355. sock = ::socket(u.sa.sa_family, SOCK_DGRAM, targetip.isIp4()?0:PF_INET6);
  1356. #ifdef SOCKTRACE
  1357. PROGLOG("SOCKTRACE: udp connected socket %x %d (%p)", sock, sock, this);
  1358. #endif
  1359. STATS.activesockets++;
  1360. if (sock == INVALID_SOCKET) {
  1361. THROWJSOCKEXCEPTION(ERRNO());
  1362. }
  1363. int res = ::connect(sock, &u.sa, ul);
  1364. if (res != 0) { // works for UDP
  1365. closesock();
  1366. THROWJSOCKEXCEPTION(JSOCKERR_connection_failed);
  1367. }
  1368. nagling = false; // means nothing for UDP
  1369. state = ss_open;
  1370. #ifdef _TRACE
  1371. setTraceName();
  1372. #endif
  1373. }
  1374. void CSocket::logPollError(unsigned revents, const char *rwstr)
  1375. {
  1376. if (revents & POLLERR)
  1377. {
  1378. StringBuffer errStr;
  1379. errStr.appendf("%s POLLERR %u l:%d r:%s:%d", rwstr, sock, localPort, (hostname?hostname:"NULL"), hostport);
  1380. int serror = 0;
  1381. socklen_t serrlen = sizeof(serror);
  1382. int srtn = getsockopt(sock, SOL_SOCKET, SO_ERROR, (char *)&serror, &serrlen);
  1383. if (srtn != 0)
  1384. serror = ERRNO();
  1385. LOGERR2(serror,2,errStr.str());
  1386. }
  1387. else if (revents & POLLNVAL)
  1388. {
  1389. StringBuffer errStr;
  1390. errStr.appendf("%s POLLINVAL", rwstr);
  1391. LOGERR2(999,3,errStr.str());
  1392. }
  1393. else
  1394. {
  1395. StringBuffer errStr;
  1396. errStr.appendf("%s unknown poll() revents: 0x%x", rwstr, revents);
  1397. LOGERR2(999,4,errStr.str());
  1398. }
  1399. }
  1400. int CSocket::wait_read(unsigned timeout)
  1401. {
  1402. int ret = 0;
  1403. while (sock!=INVALID_SOCKET)
  1404. {
  1405. #ifdef _USE_SELECT
  1406. T_FD_SET fds;
  1407. CHECKSOCKRANGE(sock);
  1408. XFD_ZERO(&fds);
  1409. FD_SET((unsigned)sock, &fds);
  1410. if (timeout==WAIT_FOREVER)
  1411. {
  1412. ret = ::select( sock + 1, (fd_set *)&fds, NULL, NULL, NULL );
  1413. }
  1414. else
  1415. {
  1416. struct timeval tv;
  1417. tv.tv_sec = timeout / 1000;
  1418. tv.tv_usec = (timeout % 1000)*1000;
  1419. ret = ::select( sock + 1, (fd_set *)&fds, NULL, NULL, &tv );
  1420. }
  1421. #else
  1422. struct pollfd fds[1];
  1423. fds[0].fd = sock;
  1424. fds[0].events = POLLIN;
  1425. fds[0].revents = 0;
  1426. ret = ::poll(fds, 1, timeout);
  1427. #endif
  1428. if (ret == SOCKET_ERROR)
  1429. { // error
  1430. int err = ERRNO();
  1431. if (err!=JSE_INTR)
  1432. { // else retry (should adjust time but for our usage don't think it matters that much)
  1433. LOGERR2(err,1,"wait_read");
  1434. break;
  1435. }
  1436. }
  1437. else if (ret == 0)
  1438. { // timeout
  1439. break;
  1440. }
  1441. else
  1442. { // ret > 0 - ready or error
  1443. #ifdef _USE_SELECT
  1444. if (!FD_ISSET(sock, &fds))
  1445. {
  1446. LOGERR2(998,7,"wait_read");
  1447. ret = -1;
  1448. }
  1449. #else
  1450. if ( (fds[0].revents & (POLLERR | POLLNVAL)) || (!(fds[0].revents & (POLLIN | POLLHUP))) )
  1451. {
  1452. logPollError(fds[0].revents, "wait_read");
  1453. ret = -1;
  1454. }
  1455. else
  1456. {
  1457. // POLLIN | POLLHUP ok
  1458. break;
  1459. }
  1460. #endif
  1461. break;
  1462. }
  1463. }
  1464. return ret;
  1465. }
  1466. int CSocket::wait_write(unsigned timeout)
  1467. {
  1468. int ret = 0;
  1469. while (sock!=INVALID_SOCKET) {
  1470. #ifdef _USE_SELECT
  1471. T_FD_SET fds;
  1472. CHECKSOCKRANGE(sock);
  1473. XFD_ZERO(&fds);
  1474. FD_SET((unsigned)sock, &fds);
  1475. if (timeout==WAIT_FOREVER) {
  1476. ret = ::select( sock + 1, NULL, (fd_set *)&fds, NULL, NULL );
  1477. }
  1478. else {
  1479. struct timeval tv;
  1480. tv.tv_sec = timeout / 1000;
  1481. tv.tv_usec = (timeout % 1000)*1000;
  1482. ret = ::select( sock + 1, NULL, (fd_set *)&fds, NULL, &tv );
  1483. }
  1484. #else
  1485. struct pollfd fds[1];
  1486. fds[0].fd = sock;
  1487. fds[0].events = POLLOUT;
  1488. fds[0].revents = 0;
  1489. ret = ::poll(fds, 1, timeout);
  1490. #endif
  1491. if (ret==SOCKET_ERROR)
  1492. {
  1493. int err = ERRNO();
  1494. if (err!=JSE_INTR)
  1495. { // else retry (should adjust time but for our usage don't think it matters that much)
  1496. LOGERR2(err,1,"wait_write");
  1497. break;
  1498. }
  1499. }
  1500. else if (ret == 0)
  1501. { // timeout
  1502. break;
  1503. }
  1504. else
  1505. { // ret > 0 - ready or error
  1506. #ifdef _USE_SELECT
  1507. if (!FD_ISSET(sock, &fds))
  1508. {
  1509. LOGERR2(998,7,"wait_write");
  1510. ret = -1;
  1511. }
  1512. #else
  1513. if ( (fds[0].revents & (POLLERR | POLLNVAL)) || (!(fds[0].revents & (POLLOUT | POLLHUP))) )
  1514. {
  1515. logPollError(fds[0].revents, "wait_write");
  1516. ret = -1;
  1517. }
  1518. else if (fds[0].revents & POLLHUP)
  1519. {
  1520. LOGERR2(998,5,"wait_write POLLHUP");
  1521. ret = -1;
  1522. }
  1523. else
  1524. {
  1525. // POLLOUT ok
  1526. break;
  1527. }
  1528. #endif
  1529. break;
  1530. }
  1531. }
  1532. return ret;
  1533. }
  1534. void CSocket::readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,
  1535. unsigned timeoutms)
  1536. {
  1537. if (timeoutms == WAIT_FOREVER) {
  1538. read(buf,min_size, max_size, size_read,WAIT_FOREVER);
  1539. return;
  1540. }
  1541. unsigned startt=usTick();
  1542. size_read = 0;
  1543. if (state != ss_open) {
  1544. THROWJSOCKEXCEPTION(JSOCKERR_not_opened);
  1545. }
  1546. unsigned start;
  1547. unsigned timeleft;
  1548. start = msTick();
  1549. timeleft = timeoutms;
  1550. do {
  1551. int rc = wait_read(timeleft);
  1552. if (rc < 0) {
  1553. THROWJSOCKEXCEPTION(ERRNO());
  1554. }
  1555. if (rc == 0) {
  1556. THROWJSOCKEXCEPTION(JSOCKERR_timeout_expired);
  1557. }
  1558. unsigned elapsed = (msTick()-start);
  1559. if (elapsed<timeoutms)
  1560. timeleft = timeoutms-elapsed;
  1561. else
  1562. timeleft = 0;
  1563. unsigned retrycount=100;
  1564. EintrRetry:
  1565. if (sockmode==sm_udp_server) { // udp server
  1566. DEFINE_SOCKADDR(u);
  1567. socklen_t ul=sizeof(u);
  1568. rc = recvfrom(sock, (char*)buf + size_read, max_size - size_read, 0, &u.sa,&ul);
  1569. getSockAddrEndpoint(u,ul,returnep);
  1570. }
  1571. else {
  1572. rc = recv(sock, (char*)buf + size_read, max_size - size_read, 0);
  1573. }
  1574. if (rc < 0) {
  1575. int err = ERRNO();
  1576. if (BADSOCKERR(err)) {
  1577. // don't think this should happen but convert to same as shutdown while investigation
  1578. LOGERR2(err,1,"Socket closed during read");
  1579. rc = 0;
  1580. }
  1581. else if ((err==JSE_INTR)&&(retrycount--!=0)) {
  1582. LOGERR2(err,1,"EINTR retrying");
  1583. goto EintrRetry;
  1584. }
  1585. else {
  1586. VStringBuffer errMsg("readtms(timeoutms=%d)", timeoutms);
  1587. LOGERR2(err,1,errMsg.str());
  1588. if ((err==JSE_CONNRESET)||(err==JSE_INTR)||(err==JSE_CONNABORTED)) {
  1589. errclose();
  1590. err = JSOCKERR_broken_pipe;
  1591. }
  1592. THROWJSOCKEXCEPTION(err);
  1593. }
  1594. }
  1595. if (rc == 0) {
  1596. state = ss_shutdown;
  1597. if (min_size==0)
  1598. break; // if min_read is 0 return 0 if socket closed
  1599. THROWJSOCKEXCEPTION(JSOCKERR_graceful_close);
  1600. }
  1601. size_read += rc;
  1602. } while (size_read < min_size);
  1603. STATS.reads++;
  1604. STATS.readsize += size_read;
  1605. STATS.readtime+=usTick()-startt;
  1606. }
  1607. void CSocket::read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,
  1608. unsigned timeoutsecs)
  1609. {
  1610. unsigned startt=usTick();
  1611. size_read = 0;
  1612. unsigned start = 0;
  1613. unsigned timeleft = 0;
  1614. if (state != ss_open) {
  1615. THROWJSOCKEXCEPTION(JSOCKERR_not_opened);
  1616. }
  1617. if (timeoutsecs != WAIT_FOREVER) {
  1618. start = (unsigned)time(NULL);
  1619. timeleft = timeoutsecs;
  1620. }
  1621. do {
  1622. int rc;
  1623. if (timeoutsecs != WAIT_FOREVER) {
  1624. rc = wait_read(timeleft*1000);
  1625. if (rc < 0) {
  1626. THROWJSOCKEXCEPTION(ERRNO());
  1627. }
  1628. if (rc == 0) {
  1629. THROWJSOCKEXCEPTION(JSOCKERR_timeout_expired);
  1630. }
  1631. unsigned elapsed = ((unsigned)time(NULL))-start;
  1632. if (elapsed<timeoutsecs)
  1633. timeleft = timeoutsecs-elapsed;
  1634. else
  1635. timeleft = 0;
  1636. }
  1637. unsigned retrycount=100;
  1638. EintrRetry:
  1639. if (sockmode==sm_udp_server) { // udp server
  1640. DEFINE_SOCKADDR(u);
  1641. socklen_t ul=sizeof(u.sin);
  1642. rc = recvfrom(sock, (char*)buf + size_read, max_size - size_read, 0, &u.sa,&ul);
  1643. getSockAddrEndpoint(u,ul,returnep);
  1644. }
  1645. else {
  1646. rc = recv(sock, (char*)buf + size_read, max_size - size_read, 0);
  1647. }
  1648. if (rc < 0) {
  1649. int err = ERRNO();
  1650. if (BADSOCKERR(err)) {
  1651. // don't think this should happen but convert to same as shutdown while investigation
  1652. LOGERR2(err,3,"Socket closed during read");
  1653. rc = 0;
  1654. }
  1655. else if ((err==JSE_INTR)&&(retrycount--!=0)) {
  1656. if (sock==INVALID_SOCKET)
  1657. rc = 0; // convert an EINTR after closed to a graceful close
  1658. else {
  1659. LOGERR2(err,3,"EINTR retrying");
  1660. goto EintrRetry;
  1661. }
  1662. }
  1663. else {
  1664. LOGERR2(err,3,"read");
  1665. if ((err==JSE_CONNRESET)||(err==JSE_INTR)||(err==JSE_CONNABORTED)) {
  1666. errclose();
  1667. err = JSOCKERR_broken_pipe;
  1668. }
  1669. THROWJSOCKEXCEPTION(err);
  1670. }
  1671. }
  1672. if (rc == 0) {
  1673. state = ss_shutdown;
  1674. if (min_size==0)
  1675. break; // if min_read is 0 return 0 if socket closed
  1676. THROWJSOCKEXCEPTION(JSOCKERR_graceful_close);
  1677. }
  1678. size_read += rc;
  1679. } while (size_read < min_size);
  1680. STATS.reads++;
  1681. STATS.readsize += size_read;
  1682. STATS.readtime+=usTick()-startt;
  1683. }
  1684. void CSocket::read(void* buf, size32_t size)
  1685. {
  1686. if (!size)
  1687. return;
  1688. unsigned startt=usTick();
  1689. size32_t size_read=size;
  1690. if (state != ss_open) {
  1691. THROWJSOCKEXCEPTION(JSOCKERR_not_opened);
  1692. }
  1693. do {
  1694. unsigned retrycount=100;
  1695. EintrRetry:
  1696. int rc;
  1697. if (sockmode==sm_udp_server) { // udp server
  1698. DEFINE_SOCKADDR(u);
  1699. socklen_t ul=sizeof(u.sin);
  1700. rc = recvfrom(sock, (char*)buf, size, 0, &u.sa,&ul);
  1701. getSockAddrEndpoint(u,ul,returnep);
  1702. }
  1703. else {
  1704. rc = recv(sock, (char*)buf, size, 0);
  1705. }
  1706. if (rc < 0) {
  1707. int err = ERRNO();
  1708. if (BADSOCKERR(err)) {
  1709. // don't think this should happen but convert to same as shutdown while investigation
  1710. LOGERR2(err,5,"Socket closed during read");
  1711. rc = 0;
  1712. }
  1713. else if ((err==JSE_INTR)&&(retrycount--!=0)) {
  1714. LOGERR2(err,5,"EINTR retrying");
  1715. goto EintrRetry;
  1716. }
  1717. else {
  1718. LOGERR2(err,5,"read");
  1719. if ((err==JSE_CONNRESET)||(err==JSE_INTR)||(err==JSE_CONNABORTED)) {
  1720. errclose();
  1721. err = JSOCKERR_broken_pipe;
  1722. }
  1723. THROWJSOCKEXCEPTION(err);
  1724. }
  1725. }
  1726. if (rc == 0) {
  1727. state = ss_shutdown;
  1728. THROWJSOCKEXCEPTION(JSOCKERR_graceful_close);
  1729. }
  1730. buf = (char*)buf + rc;
  1731. size -= rc;
  1732. } while (size != 0);
  1733. STATS.reads++;
  1734. STATS.readsize += size_read;
  1735. STATS.readtime+=usTick()-startt;
  1736. }
  1737. size32_t CSocket::write(void const* buf, size32_t size)
  1738. {
  1739. if (size==0)
  1740. return 0;
  1741. unsigned startt=usTick();
  1742. size32_t size_writ = size;
  1743. if (state != ss_open) {
  1744. THROWJSOCKEXCEPTION(JSOCKERR_not_opened);
  1745. }
  1746. size32_t res=0;
  1747. do {
  1748. unsigned retrycount=100;
  1749. EintrRetry:
  1750. int rc;
  1751. if (sockmode==sm_udp_server) { // udp server
  1752. DEFINE_SOCKADDR(u);
  1753. socklen_t ul = setSockAddr(u,returnep,returnep.port);
  1754. rc = sendto(sock, (char*)buf, size, 0, &u.sa, ul);
  1755. }
  1756. else {
  1757. rc = send(sock, (char*)buf, size, SEND_FLAGS);
  1758. }
  1759. if (rc < 0) {
  1760. int err=ERRNO();
  1761. if (BADSOCKERR(err)) {
  1762. LOGERR2(err,7,"Socket closed during write");
  1763. rc = 0;
  1764. }
  1765. else if ((err==JSE_INTR)&&(retrycount--!=0)) {
  1766. LOGERR2(err,7,"EINTR retrying");
  1767. goto EintrRetry;
  1768. }
  1769. else {
  1770. if (((sockmode==sm_multicast)||(sockmode==sm_udp))&&(err==JSE_CONNREFUSED))
  1771. break; // ignore
  1772. LOGERR2(err,7,"write");
  1773. if ((err==JSE_CONNRESET)||(err==JSE_INTR)||(err==JSE_CONNABORTED)
  1774. #ifndef _WIN32
  1775. ||(err==EPIPE)||(err==JSE_TIMEDOUT) // linux can raise these on broken pipe
  1776. #endif
  1777. ) {
  1778. errclose();
  1779. err = JSOCKERR_broken_pipe;
  1780. }
  1781. if ((err == JSE_WOULDBLOCK) && nonblocking)
  1782. break;
  1783. THROWJSOCKEXCEPTION(err);
  1784. }
  1785. }
  1786. res += rc;
  1787. if (rc == 0) {
  1788. state = ss_shutdown;
  1789. THROWJSOCKEXCEPTION(JSOCKERR_graceful_close);
  1790. }
  1791. if (nonblocking)
  1792. break;
  1793. buf = (char*)buf + rc;
  1794. size -= rc;
  1795. } while (size != 0);
  1796. STATS.writes++;
  1797. STATS.writesize += size_writ;
  1798. STATS.writetime+=usTick()-startt;
  1799. return res;
  1800. }
  1801. size32_t CSocket::writetms(void const* buf, size32_t size, unsigned timeoutms)
  1802. {
  1803. if (size==0)
  1804. return 0;
  1805. if (state != ss_open)
  1806. {
  1807. THROWJSOCKEXCEPTION(JSOCKERR_not_opened);
  1808. }
  1809. if (timeoutms == WAIT_FOREVER)
  1810. return write(buf, size);
  1811. const char *p = (const char *)buf;
  1812. unsigned start, elapsed;
  1813. start = msTick();
  1814. elapsed = 0;
  1815. size32_t nwritten = 0;
  1816. size32_t nleft = size;
  1817. unsigned rollover = 0;
  1818. bool prevblock = set_nonblock(true);
  1819. while ( (nwritten < size) && (elapsed <= timeoutms) )
  1820. {
  1821. size32_t amnt = write(p,nleft);
  1822. if ( ((amnt == (size32_t)-1) || (amnt == 0)) && (++rollover >= 20) )
  1823. {
  1824. rollover = 0;
  1825. Sleep(20);
  1826. }
  1827. else
  1828. {
  1829. nwritten += amnt;
  1830. nleft -= amnt;
  1831. p += amnt;
  1832. }
  1833. elapsed = msTick() - start;
  1834. }
  1835. set_nonblock(prevblock);
  1836. if (nwritten < size)
  1837. {
  1838. ERRLOG("writetms timed out; timeout: %u, nwritten: %u, size: %u", timeoutms, nwritten, size);
  1839. THROWJSOCKEXCEPTION(JSOCKERR_timeout_expired);
  1840. }
  1841. return nwritten;
  1842. }
  1843. bool CSocket::check_connection()
  1844. {
  1845. if (state != ss_open)
  1846. return false;
  1847. unsigned retrycount=100;
  1848. EintrRetry:
  1849. int rc;
  1850. if (sockmode==sm_udp_server) { // udp server
  1851. DEFINE_SOCKADDR(u);
  1852. socklen_t ul = setSockAddr(u,returnep,returnep.port);
  1853. rc = sendto(sock, NULL, 0, 0, &u.sa, ul);
  1854. }
  1855. else {
  1856. rc = send(sock, NULL, 0, SEND_FLAGS);
  1857. }
  1858. if (rc < 0) {
  1859. int err=ERRNO();
  1860. if ((err==JSE_INTR)&&(retrycount--!=0)) {
  1861. LOGERR2(err,7,"EINTR retrying");
  1862. goto EintrRetry;
  1863. }
  1864. else
  1865. return false;
  1866. }
  1867. return true;
  1868. }
  1869. size32_t CSocket::udp_write_to(const SocketEndpoint &ep, void const* buf, size32_t size)
  1870. {
  1871. if (size==0)
  1872. return 0;
  1873. unsigned startt=usTick();
  1874. if (state != ss_open) {
  1875. THROWJSOCKEXCEPTION(JSOCKERR_not_opened);
  1876. }
  1877. size32_t res=0;
  1878. DEFINE_SOCKADDR(u);
  1879. for (;;) {
  1880. socklen_t ul = setSockAddr(u,ep,ep.port);
  1881. int rc = sendto(sock, (char*)buf, size, 0, &u.sa, ul);
  1882. if (rc < 0) {
  1883. int err=ERRNO();
  1884. if (((sockmode==sm_multicast)||(sockmode==sm_udp))&&(err==JSE_CONNREFUSED))
  1885. break; // ignore
  1886. if (err!=JSE_INTR) {
  1887. THROWJSOCKEXCEPTION(err);
  1888. }
  1889. }
  1890. else {
  1891. res = (size32_t)rc;
  1892. break;
  1893. }
  1894. }
  1895. STATS.writes++;
  1896. STATS.writesize += res;
  1897. STATS.writetime+=usTick()-startt;
  1898. return res;
  1899. }
  1900. size32_t CSocket::write_multiple(unsigned num,const void **buf, size32_t *size)
  1901. {
  1902. assertex(sockmode!=sm_udp_server);
  1903. assertex(!nonblocking);
  1904. if (num==1)
  1905. return write(buf[0],size[0]);
  1906. size32_t total = 0;
  1907. unsigned i;
  1908. for (i=0;i<num;i++)
  1909. total += size[i];
  1910. if (total==0)
  1911. return 0;
  1912. unsigned startt=usTick();
  1913. if (state != ss_open) {
  1914. THROWJSOCKEXCEPTION(JSOCKERR_not_opened);
  1915. }
  1916. size32_t res=0;
  1917. #ifdef _WIN32
  1918. WSABUF *bufs = (WSABUF *)alloca(sizeof(WSABUF)*num);
  1919. for (i=0;i<num;i++) {
  1920. bufs[i].buf = (char *)buf[i];
  1921. bufs[i].len = size[i];
  1922. }
  1923. unsigned retrycount=100;
  1924. EintrRetry:
  1925. DWORD sent;
  1926. if (WSASendTo(sock,bufs,num,&sent,0,NULL,0,NULL,NULL)==SOCKET_ERROR) {
  1927. int err=ERRNO();
  1928. if (BADSOCKERR(err)) {
  1929. LOGERR2(err,8,"Socket closed during write");
  1930. sent = 0;
  1931. }
  1932. else if ((err==JSE_INTR)&&(retrycount--!=0)) {
  1933. LOGERR2(err,8,"EINTR retrying");
  1934. goto EintrRetry;
  1935. }
  1936. else {
  1937. LOGERR2(err,8,"write_multiple");
  1938. if ((err==JSE_CONNRESET)||(err==JSE_INTR)||(err==JSE_CONNABORTED)||(err==JSE_TIMEDOUT)) {
  1939. errclose();
  1940. err = JSOCKERR_broken_pipe;
  1941. }
  1942. THROWJSOCKEXCEPTION(err);
  1943. }
  1944. }
  1945. if (sent == 0) {
  1946. state = ss_shutdown;
  1947. THROWJSOCKEXCEPTION(JSOCKERR_graceful_close);
  1948. }
  1949. res = sent;
  1950. #else
  1951. #ifdef USE_CORK
  1952. if (total>1024) {
  1953. class Copt
  1954. {
  1955. T_SOCKET sock;
  1956. bool nagling;
  1957. public:
  1958. Copt(T_SOCKET _sock,bool _nagling)
  1959. {
  1960. nagling = _nagling;
  1961. int enabled = 1;
  1962. int disabled = 0;
  1963. if (!nagling)
  1964. setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char*)&disabled, sizeof(disabled));
  1965. setsockopt(sock, IPPROTO_TCP, TCP_CORK, (char*)&enabled, sizeof(enabled));
  1966. }
  1967. ~Copt()
  1968. {
  1969. int enabled = 1;
  1970. int disabled = 0;
  1971. setsockopt(sock, IPPROTO_TCP, TCP_CORK, (char*)&disabled, sizeof(disabled));
  1972. if (!nagling)
  1973. setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char*)&enabled, sizeof(enabled));
  1974. }
  1975. } copt(sock,nagling);
  1976. for (i=0;i<num;i++)
  1977. res += write(buf[i],size[i]);
  1978. }
  1979. else {
  1980. byte b[1024];
  1981. byte *p=b;
  1982. for (i=0;i<num;i++) {
  1983. memcpy(p,buf[i],size[i]);
  1984. p += size[i];
  1985. }
  1986. res = write(b,total);
  1987. }
  1988. #else
  1989. // send in equal chunks about 64K
  1990. unsigned n = (total+0xffff)/0x10000;
  1991. size32_t outbufsize = (total+n-1)/n;
  1992. MemoryAttr ma;
  1993. byte *outbuf = (byte *)ma.allocate(outbufsize);
  1994. i = 0;
  1995. size32_t os = 0;
  1996. size32_t left = total;
  1997. byte *b = NULL;
  1998. size32_t s=0;
  1999. for (;;) {
  2000. while (!s&&(i<num)) {
  2001. b = (byte *)buf[i];
  2002. s = size[i];
  2003. i++;
  2004. }
  2005. if ((os==0)&&(s==left)) {
  2006. write(b,s); // go for it
  2007. break;
  2008. }
  2009. else {
  2010. size32_t cpy = outbufsize-os;
  2011. if (cpy>s)
  2012. cpy = s;
  2013. memcpy(outbuf+os,b,cpy);
  2014. os += cpy;
  2015. left -= cpy;
  2016. s -= cpy;
  2017. b += cpy;
  2018. if (left==0) {
  2019. write(outbuf,os);
  2020. break;
  2021. }
  2022. else if (os==outbufsize) {
  2023. write(outbuf,os);
  2024. os = 0;
  2025. }
  2026. }
  2027. }
  2028. #endif
  2029. #endif
  2030. STATS.writes++;
  2031. STATS.writesize += res;
  2032. STATS.writetime+=usTick()-startt;
  2033. return res;
  2034. }
  2035. bool CSocket::send_block(const void *blk,size32_t sz)
  2036. {
  2037. unsigned startt=usTick();
  2038. #ifdef TRACE_SLOW_BLOCK_TRANSFER
  2039. unsigned startt2 = startt;
  2040. unsigned startt3 = startt;
  2041. #endif
  2042. if (blockflags&BF_SYNC_TRANSFER_PULL) {
  2043. size32_t rd;
  2044. bool eof = true;
  2045. readtms(&eof,sizeof(eof),sizeof(eof),rd,blocktimeoutms);
  2046. if (eof)
  2047. return false;
  2048. #ifdef TRACE_SLOW_BLOCK_TRANSFER
  2049. startt2=usTick();
  2050. #endif
  2051. }
  2052. if (!blk||!sz) {
  2053. sz = 0;
  2054. write(&sz,sizeof(sz));
  2055. try {
  2056. bool reply;
  2057. size32_t rd;
  2058. readtms(&reply,sizeof(reply),sizeof(reply),rd,blocktimeoutms);
  2059. }
  2060. catch (IJSOCK_Exception *e) {
  2061. if ((e->errorCode()!=JSOCKERR_broken_pipe)&&(e->errorCode()!=JSOCKERR_graceful_close))
  2062. EXCLOG(e,"CSocket::send_block");
  2063. e->Release();
  2064. }
  2065. return false;
  2066. }
  2067. size32_t rsz=sz;
  2068. _WINREV(rsz);
  2069. write(&rsz,sizeof(rsz));
  2070. if (blockflags&BF_SYNC_TRANSFER_PUSH) {
  2071. #ifdef TRACE_SLOW_BLOCK_TRANSFER
  2072. startt2=usTick();
  2073. #endif
  2074. size32_t rd;
  2075. bool eof = true;
  2076. readtms(&eof,sizeof(eof),sizeof(eof),rd,blocktimeoutms);
  2077. if (eof)
  2078. return false;
  2079. #ifdef TRACE_SLOW_BLOCK_TRANSFER
  2080. startt3=usTick();
  2081. #endif
  2082. }
  2083. write(blk,sz);
  2084. if (blockflags&BF_RELIABLE_TRANSFER) {
  2085. bool isok=false;
  2086. size32_t rd;
  2087. readtms(&isok,sizeof(isok),sizeof(isok),rd,blocktimeoutms);
  2088. if (!isok)
  2089. return false;
  2090. }
  2091. unsigned nowt = usTick();
  2092. unsigned elapsed = nowt-startt;
  2093. STATS.blocksendtime+=elapsed;
  2094. STATS.numblocksends++;
  2095. STATS.blocksendsize+=sz;
  2096. if (elapsed>STATS.longestblocksend) {
  2097. STATS.longestblocksend = elapsed;
  2098. STATS.longestblocksize = sz;
  2099. }
  2100. #ifdef TRACE_SLOW_BLOCK_TRANSFER
  2101. if (elapsed>1000000*60) // over 1min
  2102. WARNLOG("send_block took %ds to %s (%d,%d,%d)",elapsed/1000000,tracename,startt2-startt,startt3-startt2,nowt-startt3);
  2103. #endif
  2104. return true;
  2105. }
  2106. #ifdef USERECVSEM
  2107. class CSemProtect
  2108. {
  2109. Semaphore *sem;
  2110. bool *owned;
  2111. public:
  2112. CSemProtect() { clear(); }
  2113. ~CSemProtect()
  2114. {
  2115. if (sem&&*owned) {
  2116. *owned = false;
  2117. sem->signal();
  2118. }
  2119. }
  2120. void set(Semaphore *_sem,bool *_owned)
  2121. {
  2122. sem = _sem;
  2123. owned = _owned;
  2124. }
  2125. bool wait(Semaphore *_sem,bool *_owned,unsigned timeout) {
  2126. if (!*_owned&&!_sem->wait(timeout))
  2127. return false;
  2128. *_owned = true;
  2129. set(_sem,_owned);
  2130. return true;
  2131. }
  2132. void clear() { sem = NULL; owned = NULL; }
  2133. };
  2134. #endif
  2135. size32_t CSocket::receive_block_size()
  2136. {
  2137. // assumed always paired with receive_block
  2138. if (nextblocksize) {
  2139. if (blockflags&BF_SYNC_TRANSFER_PULL) {
  2140. bool eof=false;
  2141. write(&eof,sizeof(eof));
  2142. }
  2143. size32_t rd;
  2144. readtms(&nextblocksize,sizeof(nextblocksize),sizeof(nextblocksize),rd,blocktimeoutms);
  2145. _WINREV(nextblocksize);
  2146. if (nextblocksize==0) { // confirm eof
  2147. try {
  2148. bool confirm=true;
  2149. write(&confirm,sizeof(confirm));
  2150. }
  2151. catch (IJSOCK_Exception *e) {
  2152. if ((e->errorCode()!=JSOCKERR_broken_pipe)&&(e->errorCode()!=JSOCKERR_graceful_close))
  2153. EXCLOG(e,"receive_block_size");
  2154. e->Release();
  2155. }
  2156. }
  2157. else if (blockflags&BF_SYNC_TRANSFER_PUSH) { // leaves receiveblocksem clear
  2158. #ifdef USERECVSEM
  2159. CSemProtect semprot; // this will catch exception in write
  2160. while (!semprot.wait(&receiveblocksem,&receiveblocksemowned,60*1000*5))
  2161. WARNLOG("Receive block stalled");
  2162. #endif
  2163. bool eof=false;
  2164. write(&eof,sizeof(eof));
  2165. #ifdef USERECVSEM
  2166. semprot.clear();
  2167. #endif
  2168. }
  2169. }
  2170. return nextblocksize;
  2171. }
  2172. size32_t CSocket::receive_block(void *blk,size32_t maxsize)
  2173. {
  2174. #ifdef USERECVSEM
  2175. CSemProtect semprot; // this will catch exceptions
  2176. #endif
  2177. size32_t sz = nextblocksize;
  2178. if (sz) {
  2179. if (sz==UINT_MAX) { // need to get size
  2180. if (!blk||!maxsize) {
  2181. if (blockflags&BF_SYNC_TRANSFER_PUSH) { // ignore block size
  2182. size32_t rd;
  2183. readtms(&nextblocksize,sizeof(nextblocksize),sizeof(nextblocksize),rd,blocktimeoutms);
  2184. }
  2185. if (blockflags&(BF_SYNC_TRANSFER_PULL|BF_SYNC_TRANSFER_PUSH)) { // signal eof
  2186. bool eof=true;
  2187. write(&eof,sizeof(eof));
  2188. nextblocksize = 0;
  2189. return 0;
  2190. }
  2191. }
  2192. sz = receive_block_size();
  2193. if (!sz)
  2194. return 0;
  2195. }
  2196. unsigned startt=usTick(); // include sem block but not initial handshake
  2197. #ifdef USERECVSEM
  2198. if (blockflags&BF_SYNC_TRANSFER_PUSH) // read_block_size sets semaphore
  2199. semprot.set(&receiveblocksem,&receiveblocksemowned); // this will reset semaphore on exit
  2200. #endif
  2201. nextblocksize = UINT_MAX;
  2202. size32_t rd;
  2203. if (sz<=maxsize) {
  2204. readtms(blk,sz,sz,rd,blocktimeoutms);
  2205. }
  2206. else { // truncate
  2207. readtms(blk,maxsize,maxsize,rd,blocktimeoutms);
  2208. sz -= maxsize;
  2209. void *tmp=malloc(sz);
  2210. readtms(tmp,sz,sz,rd,blocktimeoutms);
  2211. free(tmp);
  2212. sz = maxsize;
  2213. }
  2214. if (blockflags&BF_RELIABLE_TRANSFER) {
  2215. bool isok=true;
  2216. write(&isok,sizeof(isok));
  2217. }
  2218. unsigned elapsed = usTick()-startt;
  2219. STATS.blockrecvtime+=elapsed;
  2220. STATS.numblockrecvs++;
  2221. STATS.blockrecvsize+=sz;
  2222. }
  2223. return sz;
  2224. }
  2225. void CSocket::set_block_mode(unsigned flags, size32_t recsize, unsigned _timeoutms)
  2226. {
  2227. blockflags = flags;
  2228. nextblocksize = UINT_MAX;
  2229. blocktimeoutms = _timeoutms?_timeoutms:WAIT_FOREVER;
  2230. }
  2231. void CSocket::shutdown(unsigned mode)
  2232. {
  2233. if (state == ss_open) {
  2234. state = ss_shutdown;
  2235. #ifdef SOCKTRACE
  2236. PROGLOG("SOCKTRACE: shutdown(%d) socket %x %d (%p)", mode, sock, sock, this);
  2237. #endif
  2238. int rc = ::shutdown(sock, mode);
  2239. if (rc != 0) {
  2240. int err=ERRNO();
  2241. if (err==JSE_NOTCONN) {
  2242. #ifdef _TRACELINKCLOSED
  2243. DBGLOG("CSocket::shutdown(%d) failed, socket: %d", mode, sock);
  2244. #endif
  2245. LOGERR2(err,9,"shutdown");
  2246. err = JSOCKERR_broken_pipe;
  2247. }
  2248. THROWJSOCKEXCEPTION(err);
  2249. }
  2250. }
  2251. }
  2252. void CSocket::errclose()
  2253. {
  2254. #ifdef USERECVSEM
  2255. if (receiveblocksemowned) {
  2256. receiveblocksemowned = false;
  2257. receiveblocksem.signal();
  2258. }
  2259. #endif
  2260. if (state != ss_close) {
  2261. state = ss_close;
  2262. #ifdef SOCKTRACE
  2263. PROGLOG("SOCKTRACE: errclose socket %x %d (%p)", sock, sock, this);
  2264. #endif
  2265. if (mcastreq)
  2266. setsockopt(sock, IPPROTO_IP, IP_DROP_MEMBERSHIP,(char*)mcastreq,sizeof(*mcastreq));
  2267. closesock();
  2268. }
  2269. }
  2270. void CSocket::close()
  2271. {
  2272. #ifdef USERECVSEM
  2273. if (receiveblocksemowned) {
  2274. receiveblocksemowned = false;
  2275. receiveblocksem.signal();
  2276. }
  2277. #endif
  2278. if (state != ss_close) {
  2279. #ifdef SOCKTRACE
  2280. PROGLOG("SOCKTRACE: close socket %x %d (%p)", sock, sock, this);
  2281. #endif
  2282. state = ss_close;
  2283. if (mcastreq)
  2284. setsockopt(sock, IPPROTO_IP, IP_DROP_MEMBERSHIP,(char*)mcastreq,sizeof(*mcastreq));
  2285. if (closesock() != 0) {
  2286. THROWJSOCKEXCEPTION(ERRNO());
  2287. }
  2288. }
  2289. }
  2290. size32_t CSocket::get_max_send_size()
  2291. {
  2292. size32_t maxsend=0;
  2293. socklen_t size = sizeof(maxsend);
  2294. #if _WIN32
  2295. getsockopt(sock, SOL_SOCKET, SO_MAX_MSG_SIZE, (char *) &maxsend, &size);
  2296. #else
  2297. getsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *) &maxsend, &size); // not the same but closest I can find
  2298. #endif
  2299. return maxsend;
  2300. }
  2301. size32_t CSocket::get_send_buffer_size()
  2302. {
  2303. size32_t maxsend=0;
  2304. socklen_t size = sizeof(maxsend);
  2305. getsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *) &maxsend, &size);
  2306. return maxsend;
  2307. }
  2308. void CSocket::set_send_buffer_size(size32_t maxsend)
  2309. {
  2310. if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&maxsend, sizeof(maxsend))!=0) {
  2311. LOGERR2(ERRNO(),1,"setsockopt(SO_SNDBUF)");
  2312. }
  2313. #ifdef CHECKBUFSIZE
  2314. size32_t v;
  2315. if (getsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&v, sizeof(v))!=0) {
  2316. LOGERR2(ERRNO(),1,"getsockopt(SO_SNDBUF)");
  2317. }
  2318. if (v!=maxsend)
  2319. WARNLOG("set_send_buffer_size requested %d, got %d",maxsend,v);
  2320. #endif
  2321. }
  2322. size32_t CSocket::get_receive_buffer_size()
  2323. {
  2324. size32_t max=0;
  2325. socklen_t size = sizeof(max);
  2326. getsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char *) &max, &size);
  2327. return max;
  2328. }
  2329. void CSocket::set_receive_buffer_size(size32_t max)
  2330. {
  2331. if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char *)&max, sizeof(max))!=0) {
  2332. LOGERR2(ERRNO(),1,"setsockopt(SO_RCVBUF)");
  2333. }
  2334. #ifdef CHECKBUFSIZE
  2335. size32_t v;
  2336. if (getsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char *)&v, sizeof(v))!=0) {
  2337. LOGERR2(ERRNO(),1,"getsockopt(SO_RCVBUF)");
  2338. }
  2339. if (v<max)
  2340. WARNLOG("set_receive_buffer_size requested %d, got %d",max,v);
  2341. #endif
  2342. }
  2343. bool CSocket::join_multicast_group(SocketEndpoint &ep)
  2344. {
  2345. StringBuffer s;
  2346. ep.getIpText(s); // will improve later
  2347. MCASTREQ req(s.str());
  2348. if (setsockopt(sock, IPPROTO_IP, IP_ADD_MEMBERSHIP,(char*)&req, sizeof(req))!=0) {
  2349. return false;
  2350. }
  2351. return true;
  2352. }
  2353. bool CSocket::leave_multicast_group(SocketEndpoint &ep)
  2354. {
  2355. StringBuffer s;
  2356. ep.getIpText(s); // will improve later
  2357. MCASTREQ req(s.str());
  2358. if (setsockopt(sock, IPPROTO_IP, IP_DROP_MEMBERSHIP,(char*)&req, sizeof(req))!=0) {
  2359. return false;
  2360. }
  2361. return true;
  2362. }
  2363. void CSocket::set_ttl(unsigned _ttl)
  2364. {
  2365. if (_ttl)
  2366. {
  2367. u_char ttl = _ttl;
  2368. setsockopt(sock, IPPROTO_IP, IP_MULTICAST_TTL, (char *)&ttl, sizeof(ttl));
  2369. }
  2370. #ifdef SOCKTRACE
  2371. int ttl0 = 0;
  2372. socklen_t ttl1 = sizeof(ttl0);
  2373. getsockopt(sock, IPPROTO_IP, IP_MULTICAST_TTL, &ttl0, &ttl1);
  2374. DBGLOG("set_ttl: socket fd: %d requested ttl: %d actual ttl: %d", sock, _ttl, ttl0);
  2375. #endif
  2376. return;
  2377. }
  2378. void CSocket::logConnectionInfo(unsigned timeoutms, unsigned conn_mstime)
  2379. {
  2380. PROGLOG("SOCKTRACE: connect(%u) - time:%u ms fd:%d l:%d r:%s:%d", timeoutms, conn_mstime, sock, localPort, (hostname?hostname:"NULL"), hostport);
  2381. // PrintStackReport();
  2382. }
  2383. bool CSocket::isSecure() const
  2384. {
  2385. return false;
  2386. }
  2387. CSocket::~CSocket()
  2388. {
  2389. if (owned)
  2390. {
  2391. try
  2392. {
  2393. close();
  2394. }
  2395. catch (IException *e)
  2396. {
  2397. EXCLOG(e,"~CSocket close()");
  2398. e->Release();
  2399. }
  2400. }
  2401. free(hostname);
  2402. hostname = NULL;
  2403. #ifdef _TRACE
  2404. free(tracename);
  2405. tracename = NULL;
  2406. #endif
  2407. delete mcastreq;
  2408. }
  2409. CSocket::CSocket(const SocketEndpoint &ep,SOCKETMODE smode,const char *name)
  2410. {
  2411. state = ss_close;
  2412. nonblocking = false;
  2413. #ifdef USERECVSEM
  2414. receiveblocksemowned = false;
  2415. #endif
  2416. nagling = true; // until turned off
  2417. hostport = ep.port;
  2418. localPort = 0;
  2419. hostname = NULL;
  2420. mcastreq = NULL;
  2421. #ifdef _TRACE
  2422. tracename = NULL;
  2423. #endif
  2424. StringBuffer tmp;
  2425. if ((smode==sm_multicast_server)&&(name&&*name)) {
  2426. mcastreq = new MCASTREQ(name);
  2427. }
  2428. else {
  2429. if (!name&&!ep.isNull())
  2430. name = ep.getIpText(tmp).str();
  2431. hostname = name?strdup(name):NULL;
  2432. }
  2433. sock = INVALID_SOCKET;
  2434. sockmode = smode;
  2435. owned = true;
  2436. nextblocksize = 0;
  2437. in_accept = false;
  2438. accept_cancel_state = accept_not_cancelled;
  2439. #ifdef _TRACE
  2440. if (name)
  2441. setTraceName("T>", name);
  2442. else
  2443. {
  2444. StringBuffer hostname;
  2445. SocketEndpoint self;
  2446. self.setLocalHost(0);
  2447. self.getUrlStr(hostname);
  2448. setTraceName("S>", hostname.str());
  2449. }
  2450. #endif
  2451. }
  2452. CSocket::CSocket(T_SOCKET new_sock,SOCKETMODE smode,bool _owned)
  2453. {
  2454. nonblocking = false;
  2455. #ifdef USERECVSEM
  2456. receiveblocksemowned = false;
  2457. #endif
  2458. nagling = true; // until turned off
  2459. sock = new_sock;
  2460. if (new_sock!=INVALID_SOCKET)
  2461. STATS.activesockets++;
  2462. mcastreq = NULL;
  2463. #ifdef _TRACE
  2464. tracename = NULL;
  2465. #endif
  2466. state = ss_open;
  2467. sockmode = smode;
  2468. owned = _owned;
  2469. nextblocksize = 0;
  2470. in_accept = false;
  2471. accept_cancel_state = accept_not_cancelled;
  2472. set_nagle(false);
  2473. //set_linger(DEFAULT_LINGER_TIME); -- experiment with removing this as closesocket should still endevour to send outstanding data
  2474. char peer[256];
  2475. hostport = peer_name(peer,sizeof(peer));
  2476. hostname = strdup(peer);
  2477. SocketEndpoint ep;
  2478. localPort = getEndpoint(ep).port;
  2479. #ifdef _TRACE
  2480. setTraceName("A!", peer);
  2481. #endif
  2482. }
  2483. ISocket* ISocket::create(unsigned short p,int listen_queue_size)
  2484. {
  2485. if (p==0)
  2486. THROWJSOCKEXCEPTION2(JSOCKERR_bad_address);
  2487. SocketEndpoint ep;
  2488. ep.port = p;
  2489. Owned<CSocket> sock = new CSocket(ep,sm_tcp_server,NULL);
  2490. sock->open(listen_queue_size);
  2491. return sock.getClear();
  2492. }
  2493. ISocket* ISocket::create_ip(unsigned short p,const char *host,int listen_queue_size)
  2494. {
  2495. if (p==0)
  2496. THROWJSOCKEXCEPTION2(JSOCKERR_bad_address);
  2497. SocketEndpoint ep(host,p);
  2498. Owned<CSocket> sock = new CSocket(ep,sm_tcp_server,host);
  2499. sock->open(listen_queue_size);
  2500. return sock.getClear();
  2501. }
  2502. ISocket* ISocket::udp_create(unsigned short p)
  2503. {
  2504. SocketEndpoint ep;
  2505. ep.port=p;
  2506. Owned<CSocket> sock = new CSocket(ep,(p==0)?sm_udp:sm_udp_server,NULL);
  2507. sock->open(0);
  2508. return sock.getClear();
  2509. }
  2510. ISocket* ISocket::multicast_create(unsigned short p, const char *mcip, unsigned _ttl)
  2511. {
  2512. if (p==0)
  2513. THROWJSOCKEXCEPTION2(JSOCKERR_bad_address);
  2514. SocketEndpoint ep(mcip,p);
  2515. Owned<CSocket> sock = new CSocket(ep,sm_multicast_server,mcip);
  2516. sock->open(0,true);
  2517. if (_ttl)
  2518. sock->set_ttl(_ttl);
  2519. return sock.getClear();
  2520. }
  2521. ISocket* ISocket::multicast_create(unsigned short p, const IpAddress &ip, unsigned _ttl)
  2522. {
  2523. if (p==0)
  2524. THROWJSOCKEXCEPTION2(JSOCKERR_bad_address);
  2525. SocketEndpoint ep(p, ip);
  2526. StringBuffer tmp;
  2527. Owned<CSocket> sock = new CSocket(ep,sm_multicast_server,ip.getIpText(tmp).str());
  2528. sock->open(0,true);
  2529. if (_ttl)
  2530. sock->set_ttl(_ttl);
  2531. return sock.getClear();
  2532. }
  2533. ISocket* ISocket::udp_connect(unsigned short p, char const* name)
  2534. {
  2535. if (!name||!*name||(p==0))
  2536. THROWJSOCKEXCEPTION2(JSOCKERR_bad_address);
  2537. SocketEndpoint ep(name, p);
  2538. Owned<CSocket> sock = new CSocket(ep,sm_udp,name);
  2539. sock->udpconnect();
  2540. return sock.getClear();
  2541. }
  2542. ISocket* ISocket::udp_connect(const SocketEndpoint &ep)
  2543. {
  2544. Owned<CSocket> sock = new CSocket(ep,sm_udp,NULL);
  2545. sock->udpconnect();
  2546. return sock.getClear();
  2547. }
  2548. ISocket* ISocket::multicast_connect(unsigned short p, char const* mcip, unsigned _ttl)
  2549. {
  2550. if (p==0)
  2551. THROWJSOCKEXCEPTION2(JSOCKERR_bad_address);
  2552. SocketEndpoint ep(mcip,p);
  2553. return multicast_connect(ep, _ttl);
  2554. }
  2555. ISocket* ISocket::multicast_connect(const SocketEndpoint &ep, unsigned _ttl)
  2556. {
  2557. Owned<CSocket> sock = new CSocket(ep,sm_multicast,NULL);
  2558. sock->udpconnect();
  2559. if (_ttl)
  2560. sock->set_ttl(_ttl);
  2561. return sock.getClear();
  2562. }
  2563. ISocket* ISocket::attach(int s, bool tcpip)
  2564. {
  2565. CSocket* sock = new CSocket((SOCKET)s, tcpip?sm_tcp:sm_udp, false);
  2566. return sock;
  2567. }
  2568. bool isInterfaceIp(const IpAddress &ip, const char *ifname)
  2569. {
  2570. #ifdef _WIN32
  2571. return false;
  2572. #else
  2573. int fd = socket(AF_INET, SOCK_DGRAM, 0); // IPV6 TBD
  2574. if (fd<0)
  2575. return false;
  2576. MemoryAttr ma;
  2577. char *buf = (char *)ma.allocate(1024);
  2578. struct ifconf ifc;
  2579. ifc.ifc_len = 1024;
  2580. ifc.ifc_buf = buf;
  2581. if(ioctl(fd, SIOCGIFCONF, &ifc) < 0) // query interfaces
  2582. {
  2583. close(fd);
  2584. return false;
  2585. }
  2586. struct ifreq *ifr = ifc.ifc_req;
  2587. unsigned n = ifc.ifc_len/sizeof(struct ifreq);
  2588. bool match = false;
  2589. for(unsigned i=0; i<n; i++)
  2590. {
  2591. struct ifreq *item = &ifr[i];
  2592. if (ifname&&*ifname)
  2593. if (!WildMatch(item->ifr_name,ifname))
  2594. continue;
  2595. IpAddress iptest((inet_ntoa(((struct sockaddr_in *)&item->ifr_addr)->sin_addr)));
  2596. if (ip.ipequals(iptest))
  2597. {
  2598. match = true;
  2599. break;
  2600. }
  2601. }
  2602. close(fd);
  2603. return match;
  2604. #endif
  2605. }
  2606. bool getInterfaceIp(IpAddress &ip,const char *ifname)
  2607. {
  2608. #if defined(_WIN32) || defined(__APPLE__)
  2609. return false;
  2610. #else
  2611. static bool recursioncheck = false;
  2612. ip.ipset(NULL);
  2613. int fd = socket(AF_INET, SOCK_DGRAM, 0); // IPV6 TBD
  2614. if (fd<0)
  2615. return false;
  2616. MemoryAttr ma;
  2617. char *buf = (char *)ma.allocate(1024);
  2618. struct ifconf ifc;
  2619. ifc.ifc_len = 1024;
  2620. ifc.ifc_buf = buf;
  2621. if(ioctl(fd, SIOCGIFCONF, &ifc) < 0) // query interfaces
  2622. {
  2623. close(fd);
  2624. return false;
  2625. }
  2626. struct ifreq *ifr = ifc.ifc_req;
  2627. unsigned n = ifc.ifc_len/sizeof(struct ifreq);
  2628. for (int loopback = 0; loopback <= 1; loopback++)
  2629. {
  2630. for (unsigned i=0; i<n; i++)
  2631. {
  2632. bool useLoopback = (loopback==1);
  2633. struct ifreq *item = &ifr[i];
  2634. if (ifname&&*ifname)
  2635. if (!WildMatch(item->ifr_name,ifname))
  2636. continue;
  2637. IpAddress iptest((inet_ntoa(((struct sockaddr_in *)&item->ifr_addr)->sin_addr)));
  2638. if (ioctl(fd, SIOCGIFFLAGS, item) < 0)
  2639. {
  2640. if (!recursioncheck) {
  2641. recursioncheck = true;
  2642. DBGLOG("Error retrieving interface flags for interface %s", item->ifr_name);
  2643. recursioncheck = false;
  2644. }
  2645. continue;
  2646. }
  2647. bool isLoopback = iptest.isLoopBack() || ((item->ifr_flags & IFF_LOOPBACK) != 0);
  2648. bool isUp = (item->ifr_flags & IFF_UP) != 0;
  2649. if ((isLoopback==useLoopback) && isUp)
  2650. {
  2651. if (ip.isNull())
  2652. ip.ipset(iptest);
  2653. else if (!PreferredSubnet.isNull()&&!PreferredSubnet.test(ip)&&PreferredSubnet.test(iptest))
  2654. ip.ipset(iptest);
  2655. }
  2656. }
  2657. if (!ip.isNull())
  2658. break;
  2659. }
  2660. close(fd);
  2661. return !ip.isNull();
  2662. #endif
  2663. }
  2664. static StringAttr cachehostname;
  2665. static IpAddress cachehostip;
  2666. static IpAddress localhostip;
  2667. static CriticalSection hostnamesect;
  2668. const char * GetCachedHostName()
  2669. {
  2670. CriticalBlock c(hostnamesect);
  2671. if (!cachehostname.get())
  2672. {
  2673. #ifndef _WIN32
  2674. IpAddress ip;
  2675. const char *ifs = queryEnvironmentConf().queryProp("interface");
  2676. if (getInterfaceIp(ip, ifs))
  2677. {
  2678. StringBuffer ips;
  2679. ip.getIpText(ips);
  2680. if (ips.length())
  2681. {
  2682. cachehostname.set(ips.str());
  2683. cachehostip.ipset(ip);
  2684. return cachehostname.get();
  2685. }
  2686. }
  2687. #endif
  2688. char temp[1024];
  2689. if (gethostname(temp, sizeof(temp))==0)
  2690. cachehostname.set(temp);
  2691. else
  2692. cachehostname.set("localhost"); // assume no NIC card
  2693. }
  2694. return cachehostname.get();
  2695. }
  2696. IpAddress & queryLocalIP()
  2697. {
  2698. CriticalBlock c(hostnamesect);
  2699. if (localhostip.isNull())
  2700. {
  2701. if (IP6preferred)
  2702. localhostip.ipset("::1"); //IPv6
  2703. else
  2704. localhostip.ipset("127.0.0.1"); //IPv4
  2705. }
  2706. return localhostip;
  2707. }
  2708. IpAddress & queryHostIP()
  2709. {
  2710. CriticalBlock c(hostnamesect);
  2711. if (cachehostip.isNull()) {
  2712. if (!cachehostip.ipset(GetCachedHostName())) {
  2713. cachehostip.ipset(queryLocalIP());
  2714. // printf("hostname %s not resolved, using localhost\n",GetCachedHostName()); // don't use jlog in case recursive
  2715. }
  2716. }
  2717. return cachehostip;
  2718. }
  2719. IpAddress &GetHostIp(IpAddress &ip)
  2720. {
  2721. ip.ipset(queryHostIP());
  2722. return ip;
  2723. }
  2724. IpAddress &localHostToNIC(IpAddress &ip)
  2725. {
  2726. if (ip.isLoopBack())
  2727. GetHostIp(ip);
  2728. return ip;
  2729. }
  2730. // IpAddress
  2731. inline bool isIp4(const unsigned *netaddr)
  2732. {
  2733. if (IP4only)
  2734. return true;
  2735. if (netaddr[2]==0xffff0000)
  2736. return (netaddr[1]==0)&&(netaddr[0]==0);
  2737. if (netaddr[2]==0)
  2738. if ((netaddr[3]==0)&&(netaddr[0]==0)&&(netaddr[1]==0))
  2739. return true; // null address
  2740. // maybe should get loopback here
  2741. return false;
  2742. }
  2743. bool IpAddress::isIp4() const
  2744. {
  2745. return ::isIp4(netaddr);
  2746. }
  2747. bool IpAddress::isNull() const
  2748. {
  2749. return (netaddr[3]==0)&&(IP4only||((netaddr[2]==0)&&(netaddr[1]==0)&&(netaddr[0]==0)));
  2750. }
  2751. bool IpAddress::isLoopBack() const
  2752. {
  2753. if (::isIp4(netaddr)&&((netaddr[3] & 0x000000ff)==0x000007f))
  2754. return true;
  2755. return (netaddr[3]==0x1000000)&&(netaddr[2]==0)&&(netaddr[1]==0)&&(netaddr[0]==0);
  2756. }
  2757. bool IpAddress::isLocal() const
  2758. {
  2759. if (isLoopBack() || isHost())
  2760. return true;
  2761. IpAddress ip(*this);
  2762. return isInterfaceIp(ip, NULL);
  2763. }
  2764. bool IpAddress::ipequals(const IpAddress & other) const
  2765. {
  2766. // reverse compare for speed
  2767. return (other.netaddr[3]==netaddr[3])&&(IP4only||((other.netaddr[2]==netaddr[2])&&(other.netaddr[1]==netaddr[1])&&(other.netaddr[0]==netaddr[0])));
  2768. }
  2769. int IpAddress::ipcompare(const IpAddress & other) const
  2770. {
  2771. return memcmp(&netaddr, &other.netaddr, sizeof(netaddr));
  2772. }
  2773. unsigned IpAddress::iphash(unsigned prev) const
  2774. {
  2775. return hashc((const byte *)&netaddr,sizeof(netaddr),prev);
  2776. }
  2777. bool IpAddress::isHost() const
  2778. {
  2779. return ipequals(queryHostIP());
  2780. }
  2781. static bool decodeNumericIP(const char *text,unsigned *netaddr)
  2782. {
  2783. if (!text)
  2784. return false;
  2785. bool isv6 = strchr(text,':')!=NULL;
  2786. StringBuffer tmp;
  2787. if ((*text=='[')&&!IP4only) {
  2788. text++;
  2789. size32_t l = strlen(text);
  2790. if ((l<=2)||(text[l-1]!=']'))
  2791. return false;
  2792. text = tmp.append(l-2,text);
  2793. }
  2794. if (!isv6&&isdigit(text[0])) {
  2795. if (_inet_pton(AF_INET, text, &netaddr[3])>0) {
  2796. netaddr[2] = netaddr[3]?0xffff0000:0; // check for NULL
  2797. netaddr[1] = 0;
  2798. netaddr[0] = 0; // special handling for loopback?
  2799. return true;
  2800. }
  2801. }
  2802. else if (isv6&&!IP4only) {
  2803. int ret = _inet_pton(AF_INET6, text, netaddr);
  2804. if (ret>=0)
  2805. return (ret>0);
  2806. int err = ERRNO();
  2807. StringBuffer tmp("_inet_pton: ");
  2808. tmp.append(text);
  2809. LOGERR(err,1,tmp.str());
  2810. }
  2811. return false;
  2812. }
  2813. static bool lookupHostAddress(const char *name,unsigned *netaddr)
  2814. {
  2815. // if IP4only or using MS V6 can only resolve IPv4 using
  2816. static bool recursioncheck = false; // needed to stop error message recursing
  2817. unsigned retry=10;
  2818. #if defined(__linux__) || defined(getaddrinfo)
  2819. if (IP4only) {
  2820. #else
  2821. {
  2822. #endif
  2823. CriticalBlock c(hostnamesect);
  2824. hostent * entry = gethostbyname(name);
  2825. while (entry==NULL) {
  2826. if (retry--==0) {
  2827. if (!recursioncheck) {
  2828. recursioncheck = true;
  2829. LogErr(h_errno,1,"gethostbyname failed",__LINE__,name);
  2830. recursioncheck = false;
  2831. }
  2832. return false;
  2833. }
  2834. {
  2835. CriticalUnblock ub(hostnamesect);
  2836. Sleep((10-retry)*100);
  2837. }
  2838. entry = gethostbyname(name);
  2839. }
  2840. if (entry->h_addr_list[0]) {
  2841. unsigned ptr = 0;
  2842. if (!PreferredSubnet.isNull()) {
  2843. for (;;) {
  2844. ptr++;
  2845. if (entry->h_addr_list[ptr]==NULL) {
  2846. ptr = 0;
  2847. break;
  2848. }
  2849. IpAddress ip;
  2850. ip.setNetAddress(sizeof(unsigned),entry->h_addr_list[ptr]);
  2851. if (PreferredSubnet.test(ip))
  2852. break;
  2853. }
  2854. }
  2855. memcpy(&netaddr[3], entry->h_addr_list[ptr], sizeof(netaddr[3]));
  2856. netaddr[2] = 0xffff0000;
  2857. netaddr[1] = 0;
  2858. netaddr[0] = 0;
  2859. return true;
  2860. }
  2861. return false;
  2862. }
  2863. #if defined(__linux__) || defined(getaddrinfo)
  2864. struct addrinfo hints;
  2865. memset(&hints,0,sizeof(hints));
  2866. struct addrinfo *addrInfo = NULL;
  2867. for (;;) {
  2868. memset(&hints,0,sizeof(hints));
  2869. int ret = getaddrinfo(name, NULL , &hints, &addrInfo);
  2870. if (!ret)
  2871. break;
  2872. if (retry--==0) {
  2873. if (!recursioncheck) {
  2874. recursioncheck = true;
  2875. LogErr(ret,1,"getaddrinfo failed",__LINE__,name);
  2876. #ifdef _DEBUG
  2877. PrintStackReport();
  2878. #endif
  2879. recursioncheck = false;
  2880. }
  2881. return false;
  2882. }
  2883. Sleep((10-retry)*100);
  2884. }
  2885. struct addrinfo *best = NULL;
  2886. bool snm = !PreferredSubnet.isNull();
  2887. for (;;) {
  2888. struct addrinfo *ai;
  2889. for (ai = addrInfo; ai; ai = ai->ai_next) {
  2890. // printf("flags=%d, family=%d, socktype=%d, protocol=%d, addrlen=%d, canonname=%s\n",ai->ai_flags,ai->ai_family,ai->ai_socktype,ai->ai_protocol,ai->ai_addrlen,ai->ai_canonname?ai->ai_canonname:"NULL");
  2891. switch (ai->ai_family) {
  2892. case AF_INET: {
  2893. if (snm) {
  2894. IpAddress ip;
  2895. ip.setNetAddress(sizeof(in_addr),&(((sockaddr_in *)ai->ai_addr)->sin_addr));
  2896. if (!PreferredSubnet.test(ip))
  2897. continue;
  2898. }
  2899. if ((best==NULL)||((best->ai_family==AF_INET6)&&!IP6preferred))
  2900. best = ai;
  2901. break;
  2902. }
  2903. case AF_INET6: {
  2904. if (snm) {
  2905. IpAddress ip;
  2906. ip.setNetAddress(sizeof(in_addr6),&(((sockaddr_in6 *)ai->ai_addr)->sin6_addr));
  2907. if (!PreferredSubnet.test(ip))
  2908. continue;
  2909. }
  2910. if ((best==NULL)||((best->ai_family==AF_INET)&&IP6preferred))
  2911. best = ai;
  2912. break;
  2913. }
  2914. }
  2915. }
  2916. if (best||!snm)
  2917. break;
  2918. snm = false;
  2919. }
  2920. if (best) {
  2921. if (best->ai_family==AF_INET6)
  2922. memcpy(netaddr,&(((sockaddr_in6 *)best->ai_addr)->sin6_addr),sizeof(in6_addr));
  2923. else {
  2924. memcpy(netaddr+3,&(((sockaddr_in *)best->ai_addr)->sin_addr),sizeof(in_addr));
  2925. netaddr[2] = 0xffff0000;
  2926. netaddr[1] = 0;
  2927. netaddr[0] = 0;
  2928. }
  2929. }
  2930. freeaddrinfo(addrInfo);
  2931. return best!=NULL;
  2932. #endif
  2933. return false;
  2934. }
  2935. bool IpAddress::ipset(const char *text)
  2936. {
  2937. if (text&&*text) {
  2938. if ((text[0]=='.')&&(text[1]==0)) {
  2939. ipset(queryHostIP());
  2940. return true;
  2941. }
  2942. if (decodeNumericIP(text,netaddr))
  2943. return true;
  2944. const char *s;
  2945. for (s=text;*s;s++)
  2946. if (!isdigit(*s)&&(*s!=':')&&(*s!='.'))
  2947. break;
  2948. if (!*s)
  2949. return ipset(NULL);
  2950. if (lookupHostAddress(text,netaddr))
  2951. return true;
  2952. }
  2953. memset(&netaddr,0,sizeof(netaddr));
  2954. return false;
  2955. }
  2956. inline char * addbyte(char *s,byte b)
  2957. {
  2958. if (b>=100) {
  2959. *(s++) = b/100+'0';
  2960. b %= 100;
  2961. *(s++) = b/10+'0';
  2962. b %= 10;
  2963. }
  2964. else if (b>=10) {
  2965. *(s++) = b/10+'0';
  2966. b %= 10;
  2967. }
  2968. *(s++) = b+'0';
  2969. return s;
  2970. }
  2971. StringBuffer & IpAddress::getIpText(StringBuffer & out) const
  2972. {
  2973. if (::isIp4(netaddr)) {
  2974. const byte *ip = (const byte *)&netaddr[3];
  2975. char ips[16];
  2976. char *s = ips;
  2977. for (unsigned i=0;i<4;i++) {
  2978. if (i)
  2979. *(s++) = '.';
  2980. s = addbyte(s,ip[i]);
  2981. }
  2982. return out.append(s-ips,ips);
  2983. }
  2984. char tmp[INET6_ADDRSTRLEN];
  2985. const char *res = _inet_ntop(AF_INET6, &netaddr, tmp, sizeof(tmp));
  2986. if (!res)
  2987. throw makeOsException(errno);
  2988. return out.append(res);
  2989. }
  2990. void IpAddress::ipserialize(MemoryBuffer & out) const
  2991. {
  2992. if (((netaddr[2]==0xffff0000)||(netaddr[2]==0))&&(netaddr[1]==0)&&(netaddr[0]==0)) {
  2993. if (netaddr[3]==IPV6_SERIALIZE_PREFIX)
  2994. throw MakeStringException(-1,"Invalid network address"); // hack prevention
  2995. out.append(sizeof(netaddr[3]), &netaddr[3]);
  2996. }
  2997. else {
  2998. unsigned pfx = IPV6_SERIALIZE_PREFIX;
  2999. out.append(sizeof(pfx),&pfx).append(sizeof(netaddr),&netaddr);
  3000. }
  3001. }
  3002. void IpAddress::ipdeserialize(MemoryBuffer & in)
  3003. {
  3004. unsigned pfx;
  3005. in.read(sizeof(pfx),&pfx);
  3006. if (pfx!=IPV6_SERIALIZE_PREFIX) {
  3007. netaddr[0] = 0;
  3008. netaddr[1] = 0;
  3009. netaddr[2] = (pfx == 0 || pfx == 0x1000000) ? 0 : 0xffff0000; // catch null and loopback
  3010. netaddr[3] = pfx;
  3011. }
  3012. else
  3013. in.read(sizeof(netaddr),&netaddr);
  3014. }
  3015. unsigned IpAddress::ipdistance(const IpAddress &other,unsigned offset) const
  3016. {
  3017. if (offset>3)
  3018. offset = 3;
  3019. int i1;
  3020. _cpyrev4(&i1,&netaddr[3-offset]);
  3021. int i2;
  3022. _cpyrev4(&i2,&other.netaddr[3-offset]);
  3023. i1-=i2;
  3024. if (i1>0)
  3025. return i1;
  3026. return -i1;
  3027. }
  3028. bool IpAddress::ipincrement(unsigned count,byte minoctet,byte maxoctet,unsigned short minipv6piece,unsigned maxipv6piece)
  3029. {
  3030. unsigned base;
  3031. if (::isIp4(netaddr)) {
  3032. base = maxoctet-minoctet+1;
  3033. if (!base||(base>256))
  3034. return false;
  3035. byte * ips = (byte *)&netaddr[3];
  3036. byte * ip = ips+4;
  3037. while (count) {
  3038. if (ip==ips)
  3039. return false; // overflow
  3040. ip--;
  3041. unsigned v = (count+((*ip>minoctet)?(*ip-minoctet):0));
  3042. *ip = minoctet + v%base;
  3043. count = v/base;
  3044. }
  3045. }
  3046. else {
  3047. base = maxipv6piece-minipv6piece+1;
  3048. if (!base||(base>0x10000))
  3049. return false;
  3050. unsigned short * ps = (unsigned short *)&netaddr;
  3051. unsigned short * p = ps+8;
  3052. while (count) {
  3053. if (p==ps)
  3054. return false; // overflow (actually near impossible!)
  3055. p--;
  3056. unsigned v = (count+((*p>minipv6piece)?(*p-minipv6piece):0));
  3057. *p = minipv6piece + v%base;
  3058. count = v/base;
  3059. }
  3060. }
  3061. return true;
  3062. }
  3063. unsigned IpAddress::ipsetrange( const char *text) // e.g. 10.173.72.1-65 ('-' may be omitted)
  3064. {
  3065. unsigned e=0;
  3066. unsigned f=0;
  3067. const char *r = strchr(text,'-');
  3068. bool ok;
  3069. if (r) {
  3070. e = atoi(r+1);
  3071. StringBuffer tmp(r-text,text);
  3072. ok = ipset(tmp.str());
  3073. if (!::isIp4(netaddr))
  3074. IPV6_NOT_IMPLEMENTED(); // TBD IPv6
  3075. if (ok) {
  3076. while ((r!=text)&&(*(r-1)!='.'))
  3077. r--;
  3078. f = (r!=text)?atoi(r):0;
  3079. }
  3080. }
  3081. else
  3082. ok = ipset(text);
  3083. if ((f>e)||!ok)
  3084. return 0;
  3085. return e-f+1;
  3086. }
  3087. size32_t IpAddress::getNetAddress(size32_t maxsz,void *dst) const
  3088. {
  3089. if (maxsz==sizeof(unsigned)) {
  3090. if (::isIp4(netaddr)) {
  3091. *(unsigned *)dst = netaddr[3];
  3092. return maxsz;
  3093. }
  3094. }
  3095. else if (!IP4only&&(maxsz==sizeof(netaddr))) {
  3096. memcpy(dst,&netaddr,maxsz);
  3097. return maxsz;
  3098. }
  3099. return 0;
  3100. }
  3101. void IpAddress::setNetAddress(size32_t sz,const void *src)
  3102. {
  3103. if (sz==sizeof(unsigned)) { // IPv4
  3104. netaddr[0] = 0;
  3105. netaddr[1] = 0;
  3106. netaddr[3] = *(const unsigned *)src;
  3107. netaddr[2] = netaddr[3] ? 0xffff0000 : 0; // leave as null if Ipv4 address is null
  3108. }
  3109. else if (!IP4only&&(sz==sizeof(netaddr))) { // IPv6
  3110. memcpy(&netaddr,src,sz);
  3111. if ((netaddr[2]==0)&&(netaddr[3]!=0)&&(netaddr[3]!=0x1000000)&&(netaddr[0]==0)&&(netaddr[1]==0))
  3112. netaddr[2]=0xffff0000; // use this form only
  3113. }
  3114. else
  3115. memset(&netaddr,0,sizeof(netaddr));
  3116. }
  3117. void SocketEndpoint::deserialize(MemoryBuffer & in)
  3118. {
  3119. ipdeserialize(in);
  3120. in.read(port);
  3121. }
  3122. void SocketEndpoint::serialize(MemoryBuffer & out) const
  3123. {
  3124. ipserialize(out);
  3125. out.append(port);
  3126. }
  3127. bool SocketEndpoint::set(const char *name,unsigned short _port)
  3128. {
  3129. if (name) {
  3130. if (*name=='[') {
  3131. const char *s = name+1;
  3132. const char *t = strchr(s,']');
  3133. if (t) {
  3134. StringBuffer tmp(t-s,s);
  3135. if (t[1]==':')
  3136. _port = atoi(t+2);
  3137. return set(tmp.str(),_port);
  3138. }
  3139. }
  3140. const char * colon = strchr(name, ':');
  3141. if (colon) {
  3142. if (!IP4only&&strchr(colon+1, ':'))
  3143. colon = NULL; // hello its IpV6
  3144. }
  3145. else
  3146. colon = strchr(name, '|'); // strange hole convention
  3147. char ips[260];
  3148. if (colon) {
  3149. size32_t l = colon-name;
  3150. if (l>=sizeof(ips))
  3151. l = sizeof(ips)-1;
  3152. memcpy(ips,name,l);
  3153. ips[l] = 0;
  3154. name = ips;
  3155. _port = atoi(colon+1);
  3156. }
  3157. if (ipset(name)) {
  3158. port = _port;
  3159. return true;
  3160. }
  3161. }
  3162. ipset(NULL);
  3163. port = 0;
  3164. return false;
  3165. }
  3166. void SocketEndpoint::getUrlStr(char * str, size32_t len) const
  3167. {
  3168. if (len==0)
  3169. return;
  3170. StringBuffer _str;
  3171. getUrlStr(_str);
  3172. size32_t l = _str.length()+1;
  3173. if (l>len)
  3174. {
  3175. l = len-1;
  3176. str[l] = 0;
  3177. }
  3178. memcpy(str,_str.str(),l);
  3179. }
  3180. StringBuffer &SocketEndpoint::getUrlStr(StringBuffer &str) const
  3181. {
  3182. getIpText(str);
  3183. if (port)
  3184. str.append(':').append((unsigned)port); // TBD IPv6 put [] on
  3185. return str;
  3186. }
  3187. unsigned SocketEndpoint::hash(unsigned prev) const
  3188. {
  3189. return hashc((const byte *)&port,sizeof(port),iphash(prev));
  3190. }
  3191. //---------------------------------------------------------------------------
  3192. SocketListCreator::SocketListCreator()
  3193. {
  3194. lastPort = 0;
  3195. }
  3196. void SocketListCreator::addSocket(const SocketEndpoint &ep)
  3197. {
  3198. StringBuffer ipstr;
  3199. ep.getIpText(ipstr);
  3200. addSocket(ipstr.str(), ep.port);
  3201. }
  3202. void SocketListCreator::addSocket(const char * ip, unsigned port)
  3203. {
  3204. if (fullText.length())
  3205. fullText.append("|");
  3206. const char * prev = lastIp;
  3207. const char * startCopy = ip;
  3208. if (prev)
  3209. {
  3210. if (strcmp(ip, prev) == 0)
  3211. {
  3212. fullText.append("=");
  3213. startCopy = NULL;
  3214. }
  3215. else
  3216. {
  3217. const char * cur = ip;
  3218. for (;;)
  3219. {
  3220. char n = *cur;
  3221. if (!n)
  3222. break;
  3223. if (n != *prev)
  3224. break;
  3225. cur++;
  3226. prev++;
  3227. if (n == '.')
  3228. startCopy = cur;
  3229. }
  3230. if (startCopy != ip)
  3231. fullText.append("*");
  3232. }
  3233. }
  3234. fullText.append(startCopy);
  3235. if (lastPort != port)
  3236. fullText.append(":").append(port);
  3237. lastIp.set(ip);
  3238. lastPort = port;
  3239. }
  3240. const char * SocketListCreator::getText()
  3241. {
  3242. return fullText.str();
  3243. }
  3244. void SocketListCreator::addSockets(SocketEndpointArray &array)
  3245. {
  3246. ForEachItemIn(i,array) {
  3247. const SocketEndpoint &sockep=array.item(i);
  3248. StringBuffer ipstr;
  3249. sockep.getIpText(ipstr);
  3250. addSocket(ipstr.str(),sockep.port);
  3251. }
  3252. }
  3253. //---------------------------------------------------------------------------
  3254. SocketListParser::SocketListParser(const char * text)
  3255. {
  3256. fullText.set(text);
  3257. cursor = NULL;
  3258. lastPort = 0;
  3259. }
  3260. void SocketListParser::first(unsigned port)
  3261. {
  3262. cursor = fullText;
  3263. lastIp.set(NULL);
  3264. lastPort = port;
  3265. }
  3266. bool SocketListParser::get(StringAttr & ip, unsigned & port, unsigned index, unsigned defport)
  3267. {
  3268. first(defport);
  3269. do
  3270. {
  3271. if (!next(ip, port))
  3272. return false;
  3273. } while (index--);
  3274. return true;
  3275. }
  3276. bool SocketListParser::next(StringAttr & ip, unsigned & port)
  3277. {
  3278. // IPV6TBD
  3279. StringBuffer ipText;
  3280. if (*cursor == 0)
  3281. return false;
  3282. if (*cursor == '=')
  3283. {
  3284. ipText.append(lastIp);
  3285. cursor++;
  3286. }
  3287. else if (*cursor == '*')
  3288. {
  3289. cursor++;
  3290. //count the number of dots in the tail
  3291. const char * cur = cursor;
  3292. unsigned count = 0;
  3293. for (;;)
  3294. {
  3295. char c = *cur++;
  3296. switch (c)
  3297. {
  3298. case 0:
  3299. case '|':
  3300. case ',':
  3301. case ':':
  3302. goto done;
  3303. case '.':
  3304. ++count;
  3305. break;
  3306. }
  3307. }
  3308. done:
  3309. //copy up to the appropriate dot from the previous ip.
  3310. const unsigned dotCount = 3; //more what about 6 digit ip's
  3311. cur = lastIp;
  3312. for (;;)
  3313. {
  3314. char c = *cur++;
  3315. switch (c)
  3316. {
  3317. case 0:
  3318. case '|':
  3319. case ',':
  3320. case ':':
  3321. assertex(!"Should not get here!");
  3322. goto done2;
  3323. case '.':
  3324. ipText.append(c);
  3325. if (++count == dotCount)
  3326. goto done2;
  3327. break;
  3328. default:
  3329. ipText.append(c);
  3330. break;
  3331. }
  3332. }
  3333. done2:;
  3334. }
  3335. bool inPort = false;
  3336. port = lastPort;
  3337. for (;;)
  3338. {
  3339. char c = *cursor++;
  3340. switch (c)
  3341. {
  3342. case 0:
  3343. cursor--;
  3344. goto doneCopy;
  3345. case '|':
  3346. case ',':
  3347. goto doneCopy;
  3348. case ':':
  3349. port = atoi(cursor);
  3350. inPort = true;
  3351. break;;
  3352. default:
  3353. if (!inPort)
  3354. ipText.append(c);
  3355. break;
  3356. }
  3357. }
  3358. doneCopy:
  3359. lastIp.set(ipText.str());
  3360. ip.set(lastIp);
  3361. lastPort = port;
  3362. return true;
  3363. }
  3364. unsigned SocketListParser::getSockets(SocketEndpointArray &array,unsigned defport)
  3365. {
  3366. first(defport);
  3367. StringAttr ip;
  3368. unsigned port;
  3369. while (next(ip,port)) {
  3370. SocketEndpoint ep(ip,port);
  3371. array.append(ep);
  3372. }
  3373. return array.ordinality();
  3374. }
  3375. void getSocketStatistics(JSocketStatistics &stats)
  3376. {
  3377. // should put in simple lock
  3378. memcpy(&stats,&STATS,sizeof(stats));
  3379. }
  3380. void resetSocketStatistics()
  3381. {
  3382. unsigned activesockets=STATS.activesockets;
  3383. memset(&STATS,0,sizeof(STATS));
  3384. STATS.activesockets = activesockets;
  3385. }
  3386. static StringBuffer &appendtime(StringBuffer &s,unsigned us)
  3387. {
  3388. // attemp to get into more sensible units
  3389. if (us>10000000)
  3390. return s.append(us/1000000).append('s');
  3391. if (us>10000)
  3392. return s.append(us/1000).append("ms");
  3393. return s.append(us).append("us");
  3394. }
  3395. StringBuffer &getSocketStatisticsString(JSocketStatistics &stats,StringBuffer &str)
  3396. {
  3397. str.append("connects=").append(stats.connects).append('\n');
  3398. appendtime(str.append("connecttime="),stats.connecttime).append('\n');
  3399. str.append("failedconnects=").append(stats.failedconnects).append('\n');
  3400. appendtime(str.append("failedconnecttime="),stats.failedconnecttime).append('\n');
  3401. str.append("reads=").append(stats.reads).append('\n');
  3402. appendtime(str.append("readtime="),stats.readtime).append('\n');
  3403. str.append("readsize=").append(stats.readsize).append(" bytes\n");
  3404. str.append("writes=").append(stats.writes).append('\n');
  3405. appendtime(str.append("writetime="),stats.writetime).append('\n');
  3406. str.append("writesize=").append(stats.writesize).append(" bytes").append('\n');
  3407. str.append("activesockets=").append(stats.activesockets).append('\n');
  3408. str.append("numblockrecvs=").append(stats.numblockrecvs).append('\n');
  3409. str.append("numblocksends=").append(stats.numblocksends).append('\n');
  3410. str.append("blockrecvsize=").append(stats.blockrecvsize).append('\n');
  3411. str.append("blocksendsize=").append(stats.blocksendsize).append('\n');
  3412. str.append("blockrecvtime=").append(stats.blockrecvtime).append('\n');
  3413. str.append("blocksendtime=").append(stats.blocksendtime).append('\n');
  3414. str.append("longestblocksend=").append(stats.longestblocksend).append('\n');
  3415. str.append("longestblocksize=").append(stats.longestblocksize);
  3416. return str;
  3417. }
  3418. // ===============================================================================
  3419. // select thread for handling multiple selects
  3420. struct SelectItem
  3421. {
  3422. ISocket *sock;
  3423. T_SOCKET handle;
  3424. ISocketSelectNotify *nfy;
  3425. byte mode;
  3426. bool del;
  3427. bool add_epoll;
  3428. bool operator == (const SelectItem & other) const { return sock == other.sock; }
  3429. };
  3430. class SelectItemArray : public StructArrayOf<SelectItem> { };
  3431. class SelectItemArrayP : public StructArrayOf<SelectItem*> { };
  3432. #define SELECT_TIMEOUT_SECS 1 // but it does (TBD)
  3433. #ifdef _WIN32
  3434. // fd_set utility functions
  3435. inline T_FD_SET *cpyfds(T_FD_SET &dst,const T_FD_SET &src)
  3436. {
  3437. unsigned i = src.fd_count;
  3438. dst.fd_count = i;
  3439. while (i--)
  3440. dst.fd_array[i] = src.fd_array[i]; // possibly better as memcpy
  3441. return &dst;
  3442. }
  3443. inline bool findfds(T_FD_SET &s,T_SOCKET h,bool &c)
  3444. {
  3445. unsigned n = s.fd_count;
  3446. unsigned i;
  3447. for(i=0;i<n;i++) {
  3448. if (s.fd_array[i] == h) {
  3449. if (--n)
  3450. s.fd_array[i] = s.fd_array[n]; // remove item
  3451. else
  3452. c = false;
  3453. s.fd_count = n;
  3454. return true;
  3455. }
  3456. }
  3457. return false;
  3458. }
  3459. inline T_SOCKET popfds(T_FD_SET &s)
  3460. {
  3461. unsigned n = s.fd_count;
  3462. T_SOCKET ret;
  3463. if (n) {
  3464. ret = s.fd_array[--n];
  3465. s.fd_count = n;
  3466. }
  3467. else
  3468. ret = NULL;
  3469. return ret;
  3470. }
  3471. #else
  3472. #define _USE_PIPE_FOR_SELECT_TRIGGER
  3473. // not as optimized as windows but I am expecting to convert to using poll anyway
  3474. inline T_FD_SET *cpyfds(T_FD_SET &dst,const T_FD_SET &src)
  3475. {
  3476. memcpy(&dst,&src,sizeof(T_FD_SET));
  3477. return &dst;
  3478. }
  3479. inline bool findfds(T_FD_SET &s,T_SOCKET h,bool &c)
  3480. {
  3481. if ((unsigned)h>=XFD_SETSIZE)
  3482. return false;
  3483. return FD_ISSET(h,&s); // does not remove entry or set termination flag when done
  3484. }
  3485. #endif
  3486. class CSocketBaseThread: public Thread
  3487. {
  3488. protected:
  3489. bool terminating;
  3490. CriticalSection sect;
  3491. Semaphore ticksem;
  3492. std::atomic_uint tickwait;
  3493. unsigned offset;
  3494. bool selectvarschange;
  3495. unsigned waitingchange;
  3496. Semaphore waitingchangesem;
  3497. int validateselecterror;
  3498. unsigned validateerrcount;
  3499. const char *selecttrace;
  3500. unsigned basesize;
  3501. #ifdef _USE_PIPE_FOR_SELECT_TRIGGER
  3502. T_SOCKET dummysock[2];
  3503. #else
  3504. T_SOCKET dummysock;
  3505. #endif
  3506. bool dummysockopen;
  3507. CSocketBaseThread(const char *trc) : Thread("CSocketBaseThread"), tickwait(0)
  3508. {
  3509. }
  3510. ~CSocketBaseThread()
  3511. {
  3512. }
  3513. public:
  3514. void triggerselect()
  3515. {
  3516. if (tickwait)
  3517. ticksem.signal();
  3518. #ifdef _USE_PIPE_FOR_SELECT_TRIGGER
  3519. CriticalBlock block(sect);
  3520. char c = 0;
  3521. if(write(dummysock[1], &c, 1) != 1) {
  3522. int err = ERRNO();
  3523. LOGERR(err,1,"Socket closed during trigger select");
  3524. }
  3525. #else
  3526. closedummy();
  3527. #endif
  3528. }
  3529. void resettrigger()
  3530. {
  3531. #ifdef _USE_PIPE_FOR_SELECT_TRIGGER
  3532. CriticalBlock block(sect);
  3533. char c;
  3534. while((::read(dummysock[0], &c, sizeof(c))) == sizeof(c));
  3535. #endif
  3536. }
  3537. void stop(bool wait)
  3538. {
  3539. terminating = true;
  3540. triggerselect();
  3541. if (wait)
  3542. join();
  3543. }
  3544. bool sockOk(T_SOCKET sock)
  3545. {
  3546. PROGLOG("CSocketBaseThread: sockOk testing %d",sock);
  3547. int t=0;
  3548. socklen_t tl = sizeof(t);
  3549. if (getsockopt(sock, SOL_SOCKET, SO_TYPE, (char *)&t, &tl)!=0) {
  3550. StringBuffer sockstr;
  3551. const char *tracename = sockstr.append((unsigned)sock).str();
  3552. LOGERR2(ERRNO(),1,"CSocketBaseThread select handle");
  3553. return false;
  3554. }
  3555. #ifdef _USE_SELECT
  3556. T_FD_SET fds;
  3557. struct timeval tv;
  3558. CHECKSOCKRANGE(sock);
  3559. XFD_ZERO(&fds);
  3560. FD_SET((unsigned)sock, &fds);
  3561. //FD_SET((unsigned)sock, &except);
  3562. tv.tv_sec = 0;
  3563. tv.tv_usec = 0;
  3564. int rc = ::select( sock + 1, NULL, (fd_set *)&fds, NULL, &tv );
  3565. if (rc<0) {
  3566. StringBuffer sockstr;
  3567. const char *tracename = sockstr.append((unsigned)sock).str();
  3568. LOGERR2(ERRNO(),2,"CSocketBaseThread select handle");
  3569. return false;
  3570. }
  3571. else if (rc>0)
  3572. PROGLOG("CSocketBaseThread: select handle %d selected(2) %d",sock,rc);
  3573. XFD_ZERO(&fds);
  3574. FD_SET((unsigned)sock, &fds);
  3575. tv.tv_sec = 0;
  3576. tv.tv_usec = 0;
  3577. rc = ::select( sock + 1, (fd_set *)&fds, NULL, NULL, &tv );
  3578. if (rc<0) {
  3579. StringBuffer sockstr;
  3580. const char *tracename = sockstr.append((unsigned)sock).str();
  3581. LOGERR2(ERRNO(),3,"CSocketBaseThread select handle");
  3582. return false;
  3583. }
  3584. else if (rc>0)
  3585. PROGLOG("CSocketBaseThread: select handle %d selected(2) %d",sock,rc);
  3586. return true;
  3587. #else
  3588. struct pollfd fds[1];
  3589. fds[0].fd = sock;
  3590. fds[0].events = POLLIN | POLLOUT;
  3591. fds[0].revents = 0;
  3592. int rc = ::poll(fds, 1, 0);
  3593. if (rc==0)
  3594. return true;
  3595. else if (rc>0)
  3596. {
  3597. if ( !(fds[0].revents & POLLNVAL) )
  3598. {
  3599. PROGLOG("CSocketBaseThread: poll handle %d selected(2) %d",sock,rc);
  3600. return true;
  3601. }
  3602. }
  3603. StringBuffer sockstr;
  3604. const char *tracename = sockstr.append((unsigned)sock).str();
  3605. LOGERR2(ERRNO(),3,"CSocketBaseThread poll handle");
  3606. return false;
  3607. #endif
  3608. }
  3609. virtual void closedummy() = 0;
  3610. };
  3611. class CSocketSelectThread: public CSocketBaseThread
  3612. {
  3613. SelectItemArray items;
  3614. void opendummy()
  3615. {
  3616. CriticalBlock block(sect);
  3617. if (!dummysockopen) {
  3618. #ifdef _USE_PIPE_FOR_SELECT_TRIGGER
  3619. if(pipe(dummysock)) {
  3620. WARNLOG("CSocketSelectThread: create pipe failed %d",ERRNO());
  3621. return;
  3622. }
  3623. for (unsigned i=0;i<2;i++) {
  3624. int flags = fcntl(dummysock[i], F_GETFL, 0);
  3625. if (flags!=-1) {
  3626. flags |= O_NONBLOCK;
  3627. fcntl(dummysock[i], F_SETFL, flags);
  3628. }
  3629. flags = fcntl(dummysock[i], F_GETFD, 0);
  3630. if (flags!=-1) {
  3631. flags |= FD_CLOEXEC;
  3632. fcntl(dummysock[i], F_SETFD, flags);
  3633. }
  3634. }
  3635. CHECKSOCKRANGE(dummysock[0]);
  3636. #else
  3637. if (IP6preferred)
  3638. dummysock = ::socket(AF_INET6, SOCK_STREAM, PF_INET6);
  3639. else
  3640. dummysock = ::socket(AF_INET, SOCK_STREAM, 0);
  3641. CHECKSOCKRANGE(dummysock);
  3642. #endif
  3643. dummysockopen = true;
  3644. }
  3645. }
  3646. void closedummy()
  3647. {
  3648. CriticalBlock block(sect);
  3649. if (dummysockopen) {
  3650. #ifdef _USE_PIPE_FOR_SELECT_TRIGGER
  3651. #ifdef SOCKTRACE
  3652. PROGLOG("SOCKTRACE: Closing dummy sockets %x %d %x %d (%p)", dummysock[0], dummysock[0], dummysock[1], dummysock[1], this);
  3653. #endif
  3654. ::close(dummysock[0]);
  3655. ::close(dummysock[1]);
  3656. #else
  3657. #ifdef _WIN32
  3658. ::closesocket(dummysock);
  3659. #else
  3660. ::close(dummysock);
  3661. #endif
  3662. #endif
  3663. dummysockopen = false;
  3664. }
  3665. }
  3666. #ifdef _WIN32
  3667. #define HASHTABSIZE 256
  3668. #define HASHNULL (HASHTABSIZE-1)
  3669. #define HASHTABMASK (HASHTABSIZE-1)
  3670. byte hashtab[HASHTABSIZE];
  3671. #define HASHSOCKET(s) ((((unsigned)s)>>2)&HASHTABMASK) // with some knowledge of windows handles
  3672. void inithash()
  3673. {
  3674. memset(&hashtab,HASHNULL,sizeof(hashtab));
  3675. assertex(FD_SETSIZE<255);
  3676. }
  3677. void reinithash()
  3678. { // done this way because index of items changes and hash table not that big
  3679. inithash();
  3680. assertex(items.ordinality()<HASHTABSIZE-1);
  3681. ForEachItemIn(i,items) {
  3682. unsigned h = HASHSOCKET(items.item(i).handle);
  3683. for (;;) {
  3684. if (hashtab[h]==HASHNULL) {
  3685. hashtab[h] = (byte)i;
  3686. break;
  3687. }
  3688. if (++h==HASHTABSIZE)
  3689. h = 0;
  3690. }
  3691. }
  3692. }
  3693. inline SelectItem &findhash(T_SOCKET handle)
  3694. {
  3695. unsigned h = HASHSOCKET(handle);
  3696. unsigned sh = h;
  3697. for (;;) {
  3698. SelectItem &i=items.element(hashtab[h]);
  3699. if (i.handle==handle)
  3700. return i;
  3701. if (++h==HASHTABSIZE)
  3702. h = 0;
  3703. assertex(h!=sh);
  3704. }
  3705. }
  3706. inline void processfds(T_FD_SET &s,byte mode,SelectItemArray &tonotify)
  3707. {
  3708. for (;;) {
  3709. T_SOCKET sock = popfds(s);
  3710. if (!sock)
  3711. break;
  3712. if (sock!=dummysock)
  3713. {
  3714. SelectItem &si = findhash(sock);
  3715. if (!si.del)
  3716. {
  3717. tonotify.append(si); // nb copies
  3718. SelectItem &itm = tonotify.element(tonotify.length()-1);
  3719. itm.nfy->Link();
  3720. itm.sock->Link();
  3721. itm.mode = mode;
  3722. }
  3723. }
  3724. }
  3725. }
  3726. #endif
  3727. public:
  3728. IMPLEMENT_IINTERFACE;
  3729. CSocketSelectThread(const char *trc)
  3730. : CSocketBaseThread("CSocketSelectThread")
  3731. {
  3732. dummysockopen = false;
  3733. opendummy();
  3734. terminating = false;
  3735. waitingchange = 0;
  3736. selectvarschange = false;
  3737. validateselecterror = 0;
  3738. validateerrcount = 0;
  3739. offset = 0;
  3740. selecttrace = trc;
  3741. basesize = 0;
  3742. #ifdef _WIN32
  3743. inithash();
  3744. #endif
  3745. }
  3746. ~CSocketSelectThread()
  3747. {
  3748. closedummy();
  3749. ForEachItemIn(i,items) {
  3750. // Release/dtors should not throw but leaving try/catch here until all paths checked
  3751. try {
  3752. SelectItem &si = items.element(i);
  3753. si.nfy->Release();
  3754. si.sock->Release();
  3755. }
  3756. catch (IException *e) {
  3757. EXCLOG(e,"~CSocketSelectThread");
  3758. e->Release();
  3759. }
  3760. }
  3761. }
  3762. Owned<IException> termexcept;
  3763. void updateItems()
  3764. {
  3765. // must be in CriticalBlock block(sect);
  3766. unsigned n = items.ordinality();
  3767. #ifdef _WIN32
  3768. bool hashupdateneeded = (n!=basesize); // additions all come at end
  3769. #endif
  3770. for (unsigned i=0;i<n;) {
  3771. SelectItem &si = items.element(i);
  3772. if (si.del) {
  3773. // Release/dtors should not throw but leaving try/catch here until all paths checked
  3774. try {
  3775. #ifdef SOCKTRACE
  3776. PROGLOG("CSocketSelectThread::updateItems release %d",si.handle);
  3777. #endif
  3778. si.nfy->Release();
  3779. si.sock->Release();
  3780. }
  3781. catch (IException *e) {
  3782. EXCLOG(e,"CSocketSelectThread::updateItems");
  3783. e->Release();
  3784. }
  3785. // NOTE: si is a reference, put last item into remove slot
  3786. n--;
  3787. if (i<n)
  3788. si = items.item(n);
  3789. items.remove(n);
  3790. #ifdef _WIN32
  3791. hashupdateneeded = true;
  3792. #endif
  3793. }
  3794. else
  3795. i++;
  3796. }
  3797. assertex(n<=XFD_SETSIZE-1);
  3798. #ifdef _WIN32
  3799. if (hashupdateneeded)
  3800. reinithash();
  3801. #endif
  3802. basesize = n;
  3803. }
  3804. bool checkSocks()
  3805. {
  3806. bool ret = false;
  3807. ForEachItemIn(i,items) {
  3808. SelectItem &si = items.element(i);
  3809. if (si.del)
  3810. ret = true; // maybe that bad one
  3811. else if (!sockOk(si.handle)) {
  3812. si.del = true;
  3813. ret = true;
  3814. }
  3815. }
  3816. return ret;
  3817. }
  3818. bool remove(ISocket *sock)
  3819. {
  3820. if (terminating)
  3821. return false;
  3822. CriticalBlock block(sect);
  3823. if (sock==NULL) { // wait until no changes outstanding
  3824. while (selectvarschange) {
  3825. waitingchange++;
  3826. CriticalUnblock unblock(sect);
  3827. waitingchangesem.wait();
  3828. }
  3829. return true;
  3830. }
  3831. ForEachItemIn(i,items) {
  3832. SelectItem &si = items.element(i);
  3833. if (!si.del&&(si.sock==sock)) {
  3834. si.del = true;
  3835. selectvarschange = true;
  3836. triggerselect();
  3837. return true;
  3838. }
  3839. }
  3840. return false;
  3841. }
  3842. bool add(ISocket *sock,unsigned mode,ISocketSelectNotify *nfy)
  3843. {
  3844. // maybe check once to prevent 1st delay? TBD
  3845. CriticalBlock block(sect);
  3846. unsigned n=0;
  3847. ForEachItemIn(i,items) {
  3848. SelectItem &si = items.element(i);
  3849. if (!si.del) {
  3850. if (si.sock==sock) {
  3851. si.del = true;
  3852. }
  3853. else
  3854. n++;
  3855. }
  3856. }
  3857. if (n>=XFD_SETSIZE-1) // leave 1 spare
  3858. return false;
  3859. SelectItem sn;
  3860. sn.nfy = LINK(nfy);
  3861. sn.sock = LINK(sock);
  3862. sn.mode = (byte)mode;
  3863. sn.handle = (T_SOCKET)sock->OShandle();
  3864. CHECKSOCKRANGE(sn.handle);
  3865. sn.del = false;
  3866. sn.add_epoll = false;
  3867. items.append(sn);
  3868. selectvarschange = true;
  3869. triggerselect();
  3870. return true;
  3871. }
  3872. void updateSelectVars(T_FD_SET &rdfds,T_FD_SET &wrfds,T_FD_SET &exfds,bool &isrd,bool &iswr,bool &isex,unsigned &ni,T_SOCKET &max_sockid)
  3873. {
  3874. CriticalBlock block(sect);
  3875. selectvarschange = false;
  3876. if (waitingchange) {
  3877. waitingchangesem.signal(waitingchange);
  3878. waitingchange = 0;
  3879. }
  3880. if (validateselecterror) { // something went wrong so check sockets
  3881. validateerrcount++;
  3882. if (!checkSocks()) {
  3883. // bad socket not found
  3884. PROGLOG("CSocketSelectThread::updateSelectVars cannot find socket error");
  3885. if (validateerrcount>10)
  3886. throw MakeStringException(-1,"CSocketSelectThread:Socket select error %d",validateselecterror);
  3887. }
  3888. }
  3889. else
  3890. validateerrcount = 0;
  3891. updateItems();
  3892. XFD_ZERO( &rdfds );
  3893. XFD_ZERO( &wrfds );
  3894. XFD_ZERO( &exfds );
  3895. isrd=false;
  3896. iswr=false;
  3897. isex=false;
  3898. #ifdef _USE_PIPE_FOR_SELECT_TRIGGER
  3899. max_sockid=dummysockopen?dummysock[0]:0;
  3900. #else
  3901. opendummy();
  3902. max_sockid=dummysockopen?dummysock:0;
  3903. #endif
  3904. ni = items.ordinality();
  3905. #ifdef _WIN32
  3906. if (offset>=ni)
  3907. #endif
  3908. offset = 0;
  3909. unsigned j = offset;
  3910. ForEachItemIn(i, items) {
  3911. SelectItem &si = items.element(j);
  3912. j++;
  3913. if (j==ni)
  3914. j = 0;
  3915. if (si.mode & SELECTMODE_READ) {
  3916. FD_SET( si.handle, &rdfds );
  3917. isrd = true;
  3918. }
  3919. if (si.mode & SELECTMODE_WRITE) {
  3920. FD_SET( si.handle, &wrfds );
  3921. iswr = true;
  3922. }
  3923. if (si.mode & SELECTMODE_EXCEPT) {
  3924. FD_SET( si.handle, &exfds );
  3925. isex = true;
  3926. }
  3927. max_sockid=std::max(si.handle, max_sockid);
  3928. }
  3929. if (dummysockopen) {
  3930. #ifdef _USE_PIPE_FOR_SELECT_TRIGGER
  3931. FD_SET( dummysock[0], &rdfds );
  3932. isrd = true;
  3933. #else
  3934. FD_SET( dummysock, &exfds );
  3935. isex = true;
  3936. #endif
  3937. }
  3938. validateselecterror = 0;
  3939. max_sockid++;
  3940. #ifdef SOCKTRACE
  3941. PROGLOG("SOCKTRACE: selecting on %d sockets",ni);
  3942. #endif
  3943. }
  3944. int run()
  3945. {
  3946. try
  3947. {
  3948. T_FD_SET rdfds;
  3949. T_FD_SET wrfds;
  3950. T_FD_SET exfds;
  3951. timeval selecttimeout;
  3952. bool isrd = false;
  3953. bool iswr = false;
  3954. bool isex = false;
  3955. T_SOCKET maxsockid = 0;
  3956. unsigned ni = 0;
  3957. selectvarschange = true;
  3958. unsigned numto = 0;
  3959. unsigned lastnumto = 0;
  3960. unsigned totnum = 0;
  3961. unsigned total = 0;
  3962. while (!terminating)
  3963. {
  3964. selecttimeout.tv_sec = SELECT_TIMEOUT_SECS; // linux modifies so initialize inside loop
  3965. selecttimeout.tv_usec = 0;
  3966. if (selectvarschange)
  3967. {
  3968. updateSelectVars(rdfds,wrfds,exfds,isrd,iswr,isex,ni,maxsockid);
  3969. }
  3970. if (ni==0)
  3971. {
  3972. validateerrcount = 0;
  3973. tickwait++;
  3974. if(!selectvarschange&&!terminating)
  3975. ticksem.wait(SELECT_TIMEOUT_SECS*1000);
  3976. tickwait--;
  3977. continue;
  3978. }
  3979. T_FD_SET rs;
  3980. T_FD_SET ws;
  3981. T_FD_SET es;
  3982. T_FD_SET *rsp = isrd?cpyfds(rs,rdfds):NULL;
  3983. T_FD_SET *wsp = iswr?cpyfds(ws,wrfds):NULL;
  3984. T_FD_SET *esp = isex?cpyfds(es,exfds):NULL;
  3985. int n = ::select(maxsockid,(fd_set *)rsp,(fd_set *)wsp,(fd_set *)esp,&selecttimeout); // first parameter needed for posix
  3986. if (terminating)
  3987. break;
  3988. if (n < 0)
  3989. {
  3990. CriticalBlock block(sect);
  3991. int err = ERRNO();
  3992. if (err != JSE_INTR)
  3993. {
  3994. if (dummysockopen)
  3995. {
  3996. LOGERR(err,12,"CSocketSelectThread select error"); // should cache error ?
  3997. validateselecterror = err;
  3998. #ifndef _USE_PIPE_FOR_SELECT_TRIGGER
  3999. closedummy(); // just in case was culprit
  4000. #endif
  4001. }
  4002. selectvarschange = true;
  4003. continue;
  4004. }
  4005. n = 0;
  4006. }
  4007. else if (n>0)
  4008. {
  4009. validateerrcount = 0;
  4010. numto = 0;
  4011. lastnumto = 0;
  4012. total += n;
  4013. totnum++;
  4014. SelectItemArray tonotify;
  4015. {
  4016. CriticalBlock block(sect);
  4017. #ifdef _WIN32
  4018. if (isrd)
  4019. processfds(rs,SELECTMODE_READ,tonotify);
  4020. if (iswr)
  4021. processfds(ws,SELECTMODE_WRITE,tonotify);
  4022. if (isex)
  4023. processfds(es,SELECTMODE_EXCEPT,tonotify);
  4024. #else
  4025. unsigned i;
  4026. SelectItem *si = items.getArray(offset);
  4027. SelectItem *sie = items.getArray(ni-1)+1;
  4028. bool r = isrd;
  4029. bool w = iswr;
  4030. bool e = isex;
  4031. #ifdef _USE_PIPE_FOR_SELECT_TRIGGER
  4032. if (r&&dummysockopen&&findfds(rs,dummysock[0],r))
  4033. {
  4034. resettrigger();
  4035. --n;
  4036. }
  4037. #endif
  4038. for (i=0;(n>0)&&(i<ni);i++)
  4039. {
  4040. unsigned addMode = 0;
  4041. if (r&&findfds(rs,si->handle,r))
  4042. {
  4043. if (!si->del)
  4044. addMode = SELECTMODE_READ;
  4045. --n;
  4046. }
  4047. if (w&&findfds(ws,si->handle,w))
  4048. {
  4049. if (!si->del)
  4050. addMode = SELECTMODE_WRITE;
  4051. --n;
  4052. }
  4053. if (e&&findfds(es,si->handle,e))
  4054. {
  4055. if (!si->del)
  4056. addMode = SELECTMODE_EXCEPT;
  4057. --n;
  4058. }
  4059. if (addMode)
  4060. {
  4061. tonotify.append(*si);
  4062. SelectItem &itm = tonotify.element(tonotify.length()-1);
  4063. itm.nfy->Link();
  4064. itm.sock->Link();
  4065. itm.mode = addMode;
  4066. }
  4067. si++;
  4068. if (si==sie)
  4069. si = items.getArray();
  4070. }
  4071. #endif
  4072. }
  4073. ForEachItemIn(j,tonotify)
  4074. {
  4075. const SelectItem &si = tonotify.item(j);
  4076. try
  4077. {
  4078. si.nfy->notifySelected(si.sock,si.mode); // ignore return
  4079. }
  4080. catch (IException *e)
  4081. { // should be acted upon by notifySelected
  4082. // could also not throw until after handling all events ...
  4083. EXCLOG(e,"CSocketSelectThread notifySelected");
  4084. throw ;
  4085. }
  4086. // Release/dtors should not throw but leaving try/catch here until all paths checked
  4087. try
  4088. {
  4089. si.nfy->Release();
  4090. si.sock->Release();
  4091. }
  4092. catch (IException *e)
  4093. {
  4094. EXCLOG(e,"CSocketSelectThread nfy/sock Release");
  4095. e->Release();
  4096. }
  4097. }
  4098. }
  4099. else
  4100. {
  4101. validateerrcount = 0;
  4102. if ((++numto>=lastnumto*2))
  4103. {
  4104. lastnumto = numto;
  4105. if (selecttrace&&(numto>4))
  4106. PROGLOG("%s: Select Idle(%d), %d,%d,%0.2f",selecttrace,numto,totnum,total,totnum?((double)total/(double)totnum):0.0);
  4107. }
  4108. /*
  4109. if (numto&&(numto%100))
  4110. {
  4111. CriticalBlock block(sect);
  4112. if (!selectvarschange)
  4113. selectvarschange = checkSocks();
  4114. }
  4115. */
  4116. }
  4117. if (++offset>=ni)
  4118. offset = 0;
  4119. }
  4120. }
  4121. catch (IException *e)
  4122. {
  4123. EXCLOG(e,"CSocketSelectThread");
  4124. termexcept.setown(e);
  4125. }
  4126. CriticalBlock block(sect);
  4127. try
  4128. {
  4129. updateItems();
  4130. }
  4131. catch (IException *e)
  4132. {
  4133. EXCLOG(e,"CSocketSelectThread(2)");
  4134. if (!termexcept)
  4135. termexcept.setown(e);
  4136. else
  4137. e->Release();
  4138. }
  4139. return 0;
  4140. }
  4141. };
  4142. class CSocketSelectHandler: implements ISocketSelectHandler, public CInterface
  4143. {
  4144. CIArrayOf<CSocketSelectThread> threads;
  4145. CriticalSection sect;
  4146. bool started;
  4147. StringAttr selecttrace;
  4148. public:
  4149. IMPLEMENT_IINTERFACE;
  4150. CSocketSelectHandler(const char *trc)
  4151. : selecttrace(trc)
  4152. {
  4153. started = false;
  4154. }
  4155. void start()
  4156. {
  4157. CriticalBlock block(sect);
  4158. if (!started) {
  4159. started = true;
  4160. ForEachItemIn(i,threads) {
  4161. threads.item(i).start();
  4162. }
  4163. }
  4164. }
  4165. void add(ISocket *sock,unsigned mode,ISocketSelectNotify *nfy)
  4166. {
  4167. CriticalBlock block(sect);
  4168. for (;;) {
  4169. bool added=false;
  4170. ForEachItemIn(i,threads) {
  4171. if (added)
  4172. threads.item(i).remove(sock);
  4173. else
  4174. added = threads.item(i).add(sock,mode,nfy);
  4175. }
  4176. if (added)
  4177. return;
  4178. CSocketSelectThread *thread = new CSocketSelectThread(selecttrace);
  4179. threads.append(*thread);
  4180. if (started)
  4181. thread->start();
  4182. }
  4183. }
  4184. void remove(ISocket *sock)
  4185. {
  4186. CriticalBlock block(sect);
  4187. ForEachItemIn(i,threads) {
  4188. if (threads.item(i).remove(sock)&&sock)
  4189. break;
  4190. }
  4191. }
  4192. void stop(bool wait)
  4193. {
  4194. IException *e=NULL;
  4195. CriticalBlock block(sect);
  4196. unsigned i = 0;
  4197. while (i<threads.ordinality()) {
  4198. CSocketSelectThread &t=threads.item(i);
  4199. {
  4200. CriticalUnblock unblock(sect);
  4201. t.stop(wait); // not quite as quick as could be if wait true
  4202. }
  4203. if (wait && !e && t.termexcept)
  4204. e = t.termexcept.getClear();
  4205. i++;
  4206. }
  4207. #if 0 // don't throw error as too late
  4208. if (e)
  4209. throw e;
  4210. #else
  4211. ::Release(e);
  4212. #endif
  4213. }
  4214. };
  4215. #ifdef _HAS_EPOLL_SUPPORT
  4216. class CSocketEpollThread: public CSocketBaseThread
  4217. {
  4218. int epfd;
  4219. SelectItem *sidummy;
  4220. SelectItemArrayP items;
  4221. struct epoll_event *epevents;
  4222. void epoll_op(int efd, int op, SelectItem *si, unsigned int event_mask)
  4223. {
  4224. int fd = si->handle;
  4225. int srtn;
  4226. struct epoll_event event;
  4227. // write all bytes to eliminate uninitialized warnings
  4228. memset(&event, 0, sizeof(event));
  4229. event.events = event_mask;
  4230. event.data.ptr = si;
  4231. # ifdef EPOLLTRACE
  4232. DBGLOG("EPOLL: op(%d) fd %d to epfd %d", op, fd, efd);
  4233. # endif
  4234. srtn = ::epoll_ctl(efd, op, fd, &event);
  4235. // if another thread closed fd before here don't fail
  4236. if ( (srtn < 0) && (op != EPOLL_CTL_DEL) ){
  4237. int err = ERRNO();
  4238. WARNLOG("epoll_ctl failed op:%d, fd:%d, err=%d", op, fd, err);
  4239. }
  4240. }
  4241. void opendummy()
  4242. {
  4243. CriticalBlock block(sect);
  4244. if (!dummysockopen)
  4245. {
  4246. sidummy = new SelectItem;
  4247. sidummy->sock = nullptr;
  4248. sidummy->nfy = nullptr;
  4249. sidummy->del = true; // so its not added to tonotify ...
  4250. sidummy->add_epoll = false;
  4251. sidummy->mode = 0;
  4252. #ifdef _USE_PIPE_FOR_SELECT_TRIGGER
  4253. if(pipe(dummysock))
  4254. {
  4255. WARNLOG("CSocketEpollThread: create pipe failed %d",ERRNO());
  4256. return;
  4257. }
  4258. for (unsigned i=0;i<2;i++)
  4259. {
  4260. int flags = fcntl(dummysock[i], F_GETFL, 0);
  4261. if (flags!=-1)
  4262. {
  4263. flags |= O_NONBLOCK;
  4264. fcntl(dummysock[i], F_SETFL, flags);
  4265. }
  4266. flags = fcntl(dummysock[i], F_GETFD, 0);
  4267. if (flags!=-1)
  4268. {
  4269. flags |= FD_CLOEXEC;
  4270. fcntl(dummysock[i], F_SETFD, flags);
  4271. }
  4272. }
  4273. sidummy->handle = dummysock[0];
  4274. epoll_op(epfd, EPOLL_CTL_ADD, sidummy, EPOLLIN);
  4275. #else
  4276. if (IP6preferred)
  4277. dummysock = ::socket(AF_INET6, SOCK_STREAM, PF_INET6);
  4278. else
  4279. dummysock = ::socket(AF_INET, SOCK_STREAM, 0);
  4280. // added EPOLLIN also because cannot find anywhere MSG_OOB is sent
  4281. // added here to match existing select() code above which sets
  4282. // the except fd_set mask.
  4283. sidummy->handle = dummysock;
  4284. epoll_op(epfd, EPOLL_CTL_ADD, sidummy, (EPOLLIN | EPOLLPRI));
  4285. #endif
  4286. dummysockopen = true;
  4287. }
  4288. }
  4289. void closedummy()
  4290. {
  4291. CriticalBlock block(sect);
  4292. if (dummysockopen)
  4293. {
  4294. epoll_op(epfd, EPOLL_CTL_DEL, sidummy, 0);
  4295. #ifdef _USE_PIPE_FOR_SELECT_TRIGGER
  4296. #ifdef SOCKTRACE
  4297. PROGLOG("SOCKTRACE: Closing dummy sockets %x %d %x %d (%p)", dummysock[0], dummysock[0], dummysock[1], dummysock[1], this);
  4298. #endif
  4299. ::close(dummysock[0]);
  4300. ::close(dummysock[1]);
  4301. #else
  4302. ::close(dummysock);
  4303. #endif
  4304. delete sidummy;
  4305. dummysockopen = false;
  4306. }
  4307. }
  4308. public:
  4309. IMPLEMENT_IINTERFACE;
  4310. CSocketEpollThread(const char *trc)
  4311. : CSocketBaseThread("CSocketEpollThread")
  4312. {
  4313. dummysockopen = false;
  4314. terminating = false;
  4315. waitingchange = 0;
  4316. selectvarschange = false;
  4317. validateselecterror = 0;
  4318. validateerrcount = 0;
  4319. offset = 0;
  4320. selecttrace = trc;
  4321. epfd = ::epoll_create(MAX_RET_EVENTS);
  4322. if (epfd < 0)
  4323. {
  4324. int err = ERRNO();
  4325. LOGERR(err,1,"epoll_create()");
  4326. THROWJSOCKEXCEPTION2(err);
  4327. }
  4328. # ifdef FD_CLOEXEC
  4329. int epflags = fcntl(epfd, F_GETFD, 0);
  4330. if (epflags != -1)
  4331. {
  4332. epflags |= FD_CLOEXEC;
  4333. fcntl(epfd, F_SETFD, epflags);
  4334. }
  4335. # endif
  4336. # ifdef EPOLLTRACE
  4337. DBGLOG("CSocketEpollThread: creating epoll fd %d", epfd );
  4338. # endif
  4339. try
  4340. {
  4341. epevents = new struct epoll_event[MAX_RET_EVENTS];
  4342. }
  4343. catch (const std::bad_alloc &e)
  4344. {
  4345. int err = ERRNO();
  4346. LOGERR(err,1,"epevents alloc()");
  4347. THROWJSOCKEXCEPTION2(err);
  4348. }
  4349. opendummy();
  4350. }
  4351. ~CSocketEpollThread()
  4352. {
  4353. closedummy();
  4354. ForEachItemIn(i,items)
  4355. {
  4356. SelectItem *si = items.element(i);
  4357. epoll_op(epfd, EPOLL_CTL_DEL, si, 0);
  4358. // Release/dtors should not throw but leaving try/catch here until all paths checked
  4359. try
  4360. {
  4361. si->nfy->Release();
  4362. si->sock->Release();
  4363. }
  4364. catch (IException *e)
  4365. {
  4366. EXCLOG(e,"~CSocketEpollThread");
  4367. e->Release();
  4368. }
  4369. delete si;
  4370. }
  4371. if (epfd >= 0)
  4372. {
  4373. # ifdef EPOLLTRACE
  4374. DBGLOG("EPOLL: closing epfd %d", epfd);
  4375. # endif
  4376. ::close(epfd);
  4377. epfd = -1;
  4378. delete [] epevents;
  4379. }
  4380. }
  4381. Owned<IException> termexcept;
  4382. void updateItems()
  4383. {
  4384. // must be in CriticalBlock block(sect);
  4385. unsigned n = items.ordinality();
  4386. for (unsigned i=0;i<n;)
  4387. {
  4388. SelectItem *si = items.element(i);
  4389. if (si->del)
  4390. {
  4391. epoll_op(epfd, EPOLL_CTL_DEL, si, 0);
  4392. // Release/dtors should not throw but leaving try/catch here until all paths checked
  4393. try
  4394. {
  4395. #ifdef SOCKTRACE
  4396. PROGLOG("CSocketEpollThread::updateItems release %d",si->handle);
  4397. #endif
  4398. si->nfy->Release();
  4399. si->sock->Release();
  4400. }
  4401. catch (IException *e)
  4402. {
  4403. EXCLOG(e,"CSocketEpollThread::updateItems");
  4404. e->Release();
  4405. }
  4406. delete si;
  4407. si = nullptr;
  4408. n--;
  4409. if (i<n)
  4410. items.swap(i,n);
  4411. items.remove(n);
  4412. }
  4413. else
  4414. {
  4415. if (si->add_epoll)
  4416. {
  4417. si->add_epoll = false;
  4418. if (si->mode != 0)
  4419. {
  4420. unsigned int ep_mode = 0;
  4421. if (si->mode & SELECTMODE_READ)
  4422. ep_mode |= EPOLLIN;
  4423. if (si->mode & SELECTMODE_WRITE)
  4424. ep_mode |= EPOLLOUT;
  4425. if (si->mode & SELECTMODE_EXCEPT)
  4426. ep_mode |= EPOLLPRI;
  4427. if (ep_mode != 0)
  4428. epoll_op(epfd, EPOLL_CTL_ADD, si, ep_mode);
  4429. }
  4430. }
  4431. i++;
  4432. }
  4433. }
  4434. }
  4435. bool checkSocks()
  4436. {
  4437. bool ret = false;
  4438. ForEachItemIn(i,items)
  4439. {
  4440. SelectItem *si = items.element(i);
  4441. if (si->del)
  4442. ret = true; // maybe that bad one
  4443. else if (!sockOk(si->handle))
  4444. {
  4445. si->del = true;
  4446. ret = true;
  4447. }
  4448. }
  4449. return ret;
  4450. }
  4451. bool remove(ISocket *sock)
  4452. {
  4453. if (terminating)
  4454. return false;
  4455. CriticalBlock block(sect);
  4456. if (sock==NULL)
  4457. { // wait until no changes outstanding
  4458. while (selectvarschange)
  4459. {
  4460. waitingchange++;
  4461. CriticalUnblock unblock(sect);
  4462. waitingchangesem.wait();
  4463. }
  4464. return true;
  4465. }
  4466. ForEachItemIn(i,items)
  4467. {
  4468. SelectItem *si = items.element(i);
  4469. if ( (!si->del) && (si->sock==sock) )
  4470. {
  4471. si->del = true;
  4472. selectvarschange = true;
  4473. triggerselect();
  4474. return true;
  4475. }
  4476. }
  4477. return false;
  4478. }
  4479. bool add(ISocket *sock,unsigned mode,ISocketSelectNotify *nfy)
  4480. {
  4481. if ( !sock || !nfy )
  4482. {
  4483. WARNLOG("EPOLL: adding fd but sock or nfy is NULL");
  4484. dbgassertex(false);
  4485. return false;
  4486. }
  4487. // maybe check once to prevent 1st delay? TBD
  4488. CriticalBlock block(sect);
  4489. ForEachItemIn(i,items)
  4490. {
  4491. SelectItem *si = items.element(i);
  4492. if ( !si->del && (si->sock==sock) )
  4493. si->del = true;
  4494. }
  4495. SelectItem *sn = new SelectItem;
  4496. sn->nfy = LINK(nfy);
  4497. sn->sock = LINK(sock);
  4498. sn->mode = (byte)mode;
  4499. sn->handle = (T_SOCKET)sock->OShandle();
  4500. sn->del = false;
  4501. sn->add_epoll = true;
  4502. items.append(sn);
  4503. selectvarschange = true;
  4504. triggerselect();
  4505. return true;
  4506. }
  4507. void updateEpollVars(unsigned &ni)
  4508. {
  4509. CriticalBlock block(sect);
  4510. selectvarschange = false;
  4511. if (waitingchange)
  4512. {
  4513. waitingchangesem.signal(waitingchange);
  4514. waitingchange = 0;
  4515. }
  4516. if (validateselecterror)
  4517. { // something went wrong so check sockets
  4518. validateerrcount++;
  4519. if (!checkSocks())
  4520. {
  4521. // bad socket not found
  4522. PROGLOG("CSocketEpollThread::updateEpollVars cannot find socket error");
  4523. if (validateerrcount>10)
  4524. throw MakeStringException(-1,"CSocketEpollThread:Socket epoll error %d",validateselecterror);
  4525. }
  4526. }
  4527. else
  4528. validateerrcount = 0;
  4529. updateItems();
  4530. #ifndef _USE_PIPE_FOR_SELECT_TRIGGER
  4531. opendummy();
  4532. #endif
  4533. ni = items.ordinality();
  4534. validateselecterror = 0;
  4535. }
  4536. int run()
  4537. {
  4538. try
  4539. {
  4540. unsigned ni = 0;
  4541. unsigned numto = 0;
  4542. unsigned lastnumto = 0;
  4543. unsigned totnum = 0;
  4544. unsigned total = 0;
  4545. selectvarschange = true;
  4546. while (!terminating)
  4547. {
  4548. if (selectvarschange)
  4549. updateEpollVars(ni);
  4550. if (ni==0)
  4551. {
  4552. validateerrcount = 0;
  4553. tickwait++;
  4554. if(!selectvarschange&&!terminating)
  4555. ticksem.wait(SELECT_TIMEOUT_SECS*1000);
  4556. tickwait--;
  4557. continue;
  4558. }
  4559. int n = ::epoll_wait(epfd, epevents, MAX_RET_EVENTS, 1000);
  4560. # ifdef EPOLLTRACE
  4561. if(n > 0)
  4562. DBGLOG("EPOLL: after epoll_wait(), n = %d, ni = %d", n, ni);
  4563. # endif
  4564. if (terminating)
  4565. break;
  4566. if (n < 0)
  4567. {
  4568. CriticalBlock block(sect);
  4569. int err = ERRNO();
  4570. if (err != JSE_INTR)
  4571. {
  4572. if (dummysockopen)
  4573. {
  4574. LOGERR(err,12,"CSocketEpollThread epoll error"); // should cache error ?
  4575. validateselecterror = err;
  4576. #ifndef _USE_PIPE_FOR_SELECT_TRIGGER
  4577. closedummy(); // just in case was culprit
  4578. #endif
  4579. }
  4580. selectvarschange = true;
  4581. continue;
  4582. }
  4583. n = 0;
  4584. }
  4585. else if (n>0)
  4586. {
  4587. validateerrcount = 0;
  4588. numto = 0;
  4589. lastnumto = 0;
  4590. total += n;
  4591. totnum++;
  4592. SelectItemArray tonotify;
  4593. {
  4594. CriticalBlock block(sect);
  4595. for (int j=0;j<n;j++)
  4596. {
  4597. int tfd = -1;
  4598. SelectItem *epsi = (SelectItem *)epevents[j].data.ptr;
  4599. if (epsi)
  4600. tfd = epsi->handle;
  4601. # ifdef EPOLLTRACE
  4602. DBGLOG("EPOLL: epevents[%d].data.fd = %d, emask = %u", j, tfd, epevents[j].events);
  4603. # endif
  4604. if (tfd >= 0)
  4605. {
  4606. # ifdef _USE_PIPE_FOR_SELECT_TRIGGER
  4607. if ( (dummysockopen) && (tfd == dummysock[0]) )
  4608. {
  4609. resettrigger();
  4610. continue;
  4611. }
  4612. # endif
  4613. if (!epsi->del)
  4614. {
  4615. if (!epsi->sock || !epsi->nfy)
  4616. {
  4617. WARNLOG("EPOLL: epevents[%d].data.fd = %d, emask = %u, del = false but sock or nfy is NULL", j, tfd, epevents[j].events);
  4618. }
  4619. else
  4620. {
  4621. unsigned int ep_mode = 0;
  4622. if (epevents[j].events & (EPOLLIN | EPOLLHUP | EPOLLERR))
  4623. ep_mode |= SELECTMODE_READ;
  4624. if (epevents[j].events & EPOLLOUT)
  4625. ep_mode |= SELECTMODE_WRITE;
  4626. if (epevents[j].events & EPOLLPRI)
  4627. ep_mode |= SELECTMODE_EXCEPT;
  4628. if (ep_mode != 0)
  4629. {
  4630. tonotify.append(*epsi);
  4631. SelectItem &itm = tonotify.element(tonotify.length()-1);
  4632. itm.nfy->Link();
  4633. itm.sock->Link();
  4634. #ifdef _TRACELINKCLOSED
  4635. // temporary, to help diagnose spurious socket closes (hpcc-15043)
  4636. // currently no implementation of notifySelected() uses the mode
  4637. // argument so we can pass in the epoll events mask and log that
  4638. // if there is no data and the socket gets closed
  4639. itm.mode = epevents[j].events;
  4640. #else
  4641. itm.mode = ep_mode;
  4642. #endif
  4643. }
  4644. }
  4645. }
  4646. }
  4647. }
  4648. }
  4649. ForEachItemIn(j,tonotify)
  4650. {
  4651. const SelectItem &si = tonotify.item(j);
  4652. try
  4653. {
  4654. si.nfy->notifySelected(si.sock,si.mode); // ignore return
  4655. }
  4656. catch (IException *e)
  4657. { // should be acted upon by notifySelected
  4658. // could also not throw until after handling all events ...
  4659. EXCLOG(e,"CSocketEpollThread notifySelected");
  4660. throw ;
  4661. }
  4662. // Release/dtors should not throw but leaving try/catch here until all paths checked
  4663. try
  4664. {
  4665. si.nfy->Release();
  4666. si.sock->Release();
  4667. }
  4668. catch (IException *e)
  4669. {
  4670. EXCLOG(e,"CSocketEpollThread nfy/sock Release");
  4671. e->Release();
  4672. }
  4673. }
  4674. }
  4675. else
  4676. {
  4677. validateerrcount = 0;
  4678. if ((++numto>=lastnumto*2))
  4679. {
  4680. lastnumto = numto;
  4681. if (selecttrace&&(numto>4))
  4682. PROGLOG("%s: Epoll Idle(%d), %d,%d,%0.2f",selecttrace,numto,totnum,total,totnum?((double)total/(double)totnum):0.0);
  4683. }
  4684. /*
  4685. if (numto&&(numto%100))
  4686. {
  4687. CriticalBlock block(sect);
  4688. if (!selectvarschange)
  4689. selectvarschange = checkSocks();
  4690. }
  4691. */
  4692. }
  4693. }
  4694. }
  4695. catch (IException *e)
  4696. {
  4697. EXCLOG(e,"CSocketEpollThread");
  4698. termexcept.setown(e);
  4699. }
  4700. CriticalBlock block(sect);
  4701. try
  4702. {
  4703. updateItems();
  4704. }
  4705. catch (IException *e)
  4706. {
  4707. EXCLOG(e,"CSocketEpollThread(2)");
  4708. if (!termexcept)
  4709. termexcept.setown(e);
  4710. else
  4711. e->Release();
  4712. }
  4713. return 0;
  4714. }
  4715. };
  4716. class CSocketEpollHandler: implements ISocketSelectHandler, public CInterface
  4717. {
  4718. CSocketEpollThread *epollthread;
  4719. CriticalSection sect;
  4720. StringAttr epolltrace;
  4721. public:
  4722. IMPLEMENT_IINTERFACE;
  4723. CSocketEpollHandler(const char *trc)
  4724. : epolltrace(trc)
  4725. {
  4726. epollthread = new CSocketEpollThread(epolltrace);
  4727. }
  4728. ~CSocketEpollHandler()
  4729. {
  4730. delete epollthread;
  4731. }
  4732. void start()
  4733. {
  4734. CriticalBlock block(sect);
  4735. epollthread->start();
  4736. }
  4737. void add(ISocket *sock,unsigned mode,ISocketSelectNotify *nfy)
  4738. {
  4739. CriticalBlock block(sect); // JCS->MK - are these blocks necessary? epollthread->add() uses it's own CS.
  4740. /* JCS->MK, the CSocketSelectHandler variety, checks result of thread->add and spins up another handler
  4741. * Shouldn't epoll version do the same?
  4742. */
  4743. if (!epollthread->add(sock,mode,nfy))
  4744. throw MakeStringException(-1, "CSocketEpollHandler: failed to add socket to epollthread handler: sock # = %d", sock->OShandle());
  4745. }
  4746. void remove(ISocket *sock)
  4747. {
  4748. CriticalBlock block(sect); // JCS->MK - are these blocks necessary? epollthread->add() uses it's own CS.
  4749. epollthread->remove(sock);
  4750. }
  4751. void stop(bool wait)
  4752. {
  4753. IException *e=NULL;
  4754. epollthread->stop(wait); // not quite as quick as could be if wait true
  4755. if (wait && !e && epollthread->termexcept)
  4756. e = epollthread->termexcept.getClear();
  4757. #if 0 // don't throw error as too late
  4758. if (e)
  4759. throw e;
  4760. #else
  4761. ::Release(e);
  4762. #endif
  4763. }
  4764. };
  4765. #endif // _HAS_EPOLL_SUPPORT
  4766. #ifdef _HAS_EPOLL_SUPPORT
  4767. enum EpollMethod { EPOLL_INIT = 0, EPOLL_DISABLED, EPOLL_ENABLED };
  4768. static EpollMethod epoll_method = EPOLL_INIT;
  4769. static CriticalSection epollsect;
  4770. #endif
  4771. ISocketSelectHandler *createSocketSelectHandler(const char *trc)
  4772. {
  4773. #ifdef _HAS_EPOLL_SUPPORT
  4774. {
  4775. CriticalBlock block(epollsect);
  4776. // DBGLOG("createSocketSelectHandler(): epoll_method = %d",epoll_method);
  4777. if (epoll_method == EPOLL_INIT)
  4778. {
  4779. if (queryEnvironmentConf().getPropBool("use_epoll", true))
  4780. epoll_method = EPOLL_ENABLED;
  4781. else
  4782. epoll_method = EPOLL_DISABLED;
  4783. // DBGLOG("createSocketSelectHandler(): after reading conf file, epoll_method = %d",epoll_method);
  4784. }
  4785. }
  4786. if (epoll_method == EPOLL_ENABLED)
  4787. return new CSocketEpollHandler(trc);
  4788. else
  4789. return new CSocketSelectHandler(trc);
  4790. #else
  4791. return new CSocketSelectHandler(trc);
  4792. #endif
  4793. }
  4794. ISocketSelectHandler *createSocketEpollHandler(const char *trc)
  4795. {
  4796. #ifdef _HAS_EPOLL_SUPPORT
  4797. return new CSocketEpollHandler(trc);
  4798. #else
  4799. return new CSocketSelectHandler(trc);
  4800. #endif
  4801. }
  4802. void readBuffer(ISocket * socket, MemoryBuffer & buffer)
  4803. {
  4804. size32_t len;
  4805. socket->read(&len, sizeof(len));
  4806. _WINREV4(len);
  4807. if (len) {
  4808. void * target = buffer.reserve(len);
  4809. socket->read(target, len);
  4810. }
  4811. }
  4812. void readBuffer(ISocket * socket, MemoryBuffer & buffer, unsigned timeoutms)
  4813. {
  4814. size32_t len;
  4815. size32_t sizeGot;
  4816. socket->readtms(&len, sizeof(len), sizeof(len), sizeGot, timeoutms);
  4817. _WINREV4(len);
  4818. if (len) {
  4819. void * target = buffer.reserve(len);
  4820. socket->readtms(target, len, len, sizeGot, timeoutms);
  4821. }
  4822. }
  4823. void writeBuffer(ISocket * socket, MemoryBuffer & buffer)
  4824. {
  4825. unsigned len = buffer.length();
  4826. _WINREV4(len);
  4827. socket->write(&len, sizeof(len));
  4828. if (len)
  4829. socket->write(buffer.toByteArray(), buffer.length());
  4830. }
  4831. bool catchReadBuffer(ISocket * socket, MemoryBuffer & buffer)
  4832. {
  4833. try
  4834. {
  4835. readBuffer(socket, buffer);
  4836. return true;
  4837. }
  4838. catch (IException * e)
  4839. {
  4840. switch (e->errorCode())
  4841. {
  4842. case JSOCKERR_graceful_close:
  4843. break;
  4844. default:
  4845. EXCLOG(e,"catchReadBuffer");
  4846. break;
  4847. }
  4848. e->Release();
  4849. }
  4850. return false;
  4851. }
  4852. bool catchReadBuffer(ISocket * socket, MemoryBuffer & buffer, unsigned timeoutms)
  4853. {
  4854. try
  4855. {
  4856. readBuffer(socket, buffer, timeoutms);
  4857. return true;
  4858. }
  4859. catch (IException * e)
  4860. {
  4861. switch (e->errorCode())
  4862. {
  4863. case JSOCKERR_graceful_close:
  4864. break;
  4865. default:
  4866. EXCLOG(e,"catchReadBuffer");
  4867. break;
  4868. }
  4869. e->Release();
  4870. }
  4871. return false;
  4872. }
  4873. bool catchWriteBuffer(ISocket * socket, MemoryBuffer & buffer)
  4874. {
  4875. try
  4876. {
  4877. writeBuffer(socket, buffer);
  4878. return true;
  4879. }
  4880. catch (IException * e)
  4881. {
  4882. EXCLOG(e,"catchWriteBuffer");
  4883. e->Release();
  4884. }
  4885. return false;
  4886. }
  4887. // utility interface for simple conversations
  4888. // conversation is always between two ends,
  4889. // at any given time one end must be receiving and other sending (though these may swap during the conversation)
  4890. class CSingletonSocketConnection: implements IConversation, public CInterface
  4891. {
  4892. Owned<ISocket> sock;
  4893. Owned<ISocket> listensock;
  4894. enum { Snone, Saccept, Sconnect, Srecv, Ssend, Scancelled } state;
  4895. bool cancelling;
  4896. SocketEndpoint ep;
  4897. CriticalSection crit;
  4898. public:
  4899. IMPLEMENT_IINTERFACE;
  4900. CSingletonSocketConnection(SocketEndpoint &_ep)
  4901. {
  4902. ep = _ep;
  4903. state = Snone;
  4904. cancelling = false;
  4905. }
  4906. ~CSingletonSocketConnection()
  4907. {
  4908. try {
  4909. if (sock)
  4910. sock->close();
  4911. }
  4912. catch (IException *e) {
  4913. if (e->errorCode()!=JSOCKERR_graceful_close)
  4914. EXCLOG(e,"CSingletonSocketConnection close");
  4915. e->Release();
  4916. }
  4917. }
  4918. void set_keep_alive(bool keepalive)
  4919. {
  4920. if (sock)
  4921. sock->set_keep_alive(keepalive);
  4922. }
  4923. bool connect(unsigned timeoutms)
  4924. {
  4925. CriticalBlock block(crit);
  4926. if (cancelling)
  4927. state = Scancelled;
  4928. if (state==Scancelled)
  4929. return false;
  4930. assertex(!sock);
  4931. ISocket *newsock=NULL;
  4932. state = Sconnect;
  4933. unsigned start = 0;
  4934. if (timeoutms!=(unsigned)INFINITE)
  4935. start = msTick();
  4936. while (state==Sconnect) {
  4937. try {
  4938. CriticalUnblock unblock(crit);
  4939. newsock = ISocket::connect_wait(ep,1000*60*4);
  4940. break;
  4941. }
  4942. catch (IException * e) {
  4943. if ((e->errorCode()==JSOCKERR_timeout_expired)||(e->errorCode()==JSOCKERR_connection_failed)) {
  4944. e->Release();
  4945. if ((state==Sconnect)&&(timeoutms!=(unsigned)INFINITE)&&(msTick()-start>timeoutms)) {
  4946. state = Snone;
  4947. return false;
  4948. }
  4949. }
  4950. else {
  4951. state = Scancelled;
  4952. EXCLOG(e,"CSingletonSocketConnection::connect");
  4953. e->Release();
  4954. return false;
  4955. }
  4956. }
  4957. }
  4958. if (state!=Sconnect) {
  4959. ::Release(newsock);
  4960. newsock = NULL;
  4961. }
  4962. if (!newsock) {
  4963. state = Scancelled;
  4964. return false;
  4965. }
  4966. sock.setown(newsock);
  4967. return true;
  4968. }
  4969. bool send(MemoryBuffer &mb)
  4970. {
  4971. CriticalBlock block(crit);
  4972. if (cancelling)
  4973. state = Scancelled;
  4974. if (state==Scancelled)
  4975. return false;
  4976. assertex(sock);
  4977. state = Srecv;
  4978. try {
  4979. CriticalUnblock unblock(crit);
  4980. writeBuffer(sock,mb);
  4981. }
  4982. catch (IException * e) {
  4983. state = Scancelled;
  4984. EXCLOG(e,"CSingletonSocketConnection::send");
  4985. e->Release();
  4986. return false;
  4987. }
  4988. state = Snone;
  4989. return true;
  4990. }
  4991. unsigned short setRandomPort(unsigned short base, unsigned num)
  4992. {
  4993. for (;;) {
  4994. try {
  4995. ep.port = base+(unsigned short)(getRandom()%num);
  4996. listensock.setown(ISocket::create(ep.port));
  4997. return ep.port;
  4998. }
  4999. catch (IException *e) {
  5000. if (e->errorCode()!=JSOCKERR_port_in_use) {
  5001. state = Scancelled;
  5002. EXCLOG(e,"CSingletonSocketConnection::setRandomPort");
  5003. e->Release();
  5004. break;
  5005. }
  5006. e->Release();
  5007. }
  5008. }
  5009. return 0;
  5010. }
  5011. bool accept(unsigned timeoutms)
  5012. {
  5013. CriticalBlock block(crit);
  5014. if (cancelling)
  5015. state = Scancelled;
  5016. if (state==Scancelled)
  5017. return false;
  5018. if (!sock) {
  5019. ISocket *newsock=NULL;
  5020. state = Saccept;
  5021. for (;;) {
  5022. try {
  5023. {
  5024. CriticalUnblock unblock(crit);
  5025. if (!listensock)
  5026. listensock.setown(ISocket::create(ep.port));
  5027. if ((timeoutms!=(unsigned)INFINITE)&&(!listensock->wait_read(timeoutms))) {
  5028. state = Snone;
  5029. return false;
  5030. }
  5031. }
  5032. if (cancelling)
  5033. state = Scancelled;
  5034. if (state==Scancelled)
  5035. return false;
  5036. {
  5037. CriticalUnblock unblock(crit);
  5038. newsock=listensock->accept(true);
  5039. break;
  5040. }
  5041. }
  5042. catch (IException *e) {
  5043. if (e->errorCode()==JSOCKERR_graceful_close)
  5044. PROGLOG("CSingletonSocketConnection: Closed socket on accept - retrying...");
  5045. else {
  5046. state = Scancelled;
  5047. EXCLOG(e,"CSingletonSocketConnection::accept");
  5048. e->Release();
  5049. break;
  5050. }
  5051. e->Release();
  5052. }
  5053. }
  5054. if (state!=Saccept) {
  5055. ::Release(newsock);
  5056. newsock = NULL;
  5057. }
  5058. if (!newsock) {
  5059. state = Scancelled;
  5060. return false;
  5061. }
  5062. sock.setown(newsock);
  5063. }
  5064. return true;
  5065. }
  5066. bool recv(MemoryBuffer &mb, unsigned timeoutms)
  5067. {
  5068. CriticalBlock block(crit);
  5069. if (cancelling)
  5070. state = Scancelled;
  5071. if (state==Scancelled)
  5072. return false;
  5073. assertex(sock);
  5074. state = Srecv;
  5075. try {
  5076. CriticalUnblock unblock(crit);
  5077. readBuffer(sock,mb,timeoutms);
  5078. }
  5079. catch (IException *e) {
  5080. if (e->errorCode()==JSOCKERR_timeout_expired)
  5081. state = Snone;
  5082. else {
  5083. state = Scancelled;
  5084. if (e->errorCode()!=JSOCKERR_graceful_close)
  5085. EXCLOG(e,"CSingletonSocketConnection::recv");
  5086. }
  5087. e->Release();
  5088. return false;
  5089. }
  5090. state = Snone;
  5091. return true;
  5092. }
  5093. virtual void cancel()
  5094. {
  5095. CriticalBlock block(crit);
  5096. while (state!=Scancelled) {
  5097. cancelling = true;
  5098. try {
  5099. switch (state) {
  5100. case Saccept:
  5101. {
  5102. if (listensock)
  5103. listensock->cancel_accept();
  5104. }
  5105. break;
  5106. case Sconnect:
  5107. // wait for timeout
  5108. break;
  5109. case Srecv:
  5110. {
  5111. if (sock)
  5112. sock->close();
  5113. }
  5114. break;
  5115. case Ssend:
  5116. // wait for finished
  5117. break;
  5118. default:
  5119. state = Scancelled;
  5120. break;
  5121. }
  5122. }
  5123. catch (IException *e) {
  5124. EXCLOG(e,"CSingletonSocketConnection::cancel");
  5125. e->Release();
  5126. }
  5127. {
  5128. CriticalUnblock unblock(crit);
  5129. Sleep(1000);
  5130. }
  5131. }
  5132. }
  5133. };
  5134. IConversation *createSingletonSocketConnection(unsigned short port,SocketEndpoint *_ep)
  5135. {
  5136. SocketEndpoint ep;
  5137. if (_ep)
  5138. ep = *_ep;
  5139. if (port)
  5140. ep.port = port;
  5141. return new CSingletonSocketConnection(ep);
  5142. }
  5143. // interface for reading from multiple sockets using the BF_SYNC_TRANSFER_PUSH protocol
  5144. class CSocketBufferReader: implements ISocketBufferReader, public CInterface
  5145. {
  5146. class SocketElem: implements ISocketSelectNotify, public CInterface
  5147. {
  5148. CSocketBufferReader *parent;
  5149. unsigned num; // top bit used for ready
  5150. MemoryAttr blk;
  5151. CriticalSection sect;
  5152. Linked<ISocket> sock;
  5153. bool active;
  5154. bool pending;
  5155. public:
  5156. IMPLEMENT_IINTERFACE;
  5157. void init(CSocketBufferReader *_parent,ISocket *_sock,unsigned _n)
  5158. {
  5159. parent = _parent;
  5160. num = _n;
  5161. sock.set(_sock);
  5162. active = true;
  5163. pending = false;
  5164. }
  5165. virtual bool notifySelected(ISocket *socket,unsigned selected)
  5166. {
  5167. assertex(sock==socket);
  5168. {
  5169. CriticalBlock block(sect);
  5170. if (pending) {
  5171. active = false;
  5172. parent->remove(sock);
  5173. return false;
  5174. }
  5175. pending = true;
  5176. unsigned t1=usTick();
  5177. size32_t sz = sock->receive_block_size();
  5178. unsigned t2=usTick();
  5179. if (sz)
  5180. sock->receive_block(blk.allocate(sz),sz);
  5181. else
  5182. parent->remove(sock);
  5183. unsigned t3=usTick();
  5184. if (t3-t1>60*1000000)
  5185. PROGLOG("CSocketBufferReader(%d): slow receive_block (%d,%d) sz=%d",num,t2-t1,t3-t2,sz);
  5186. }
  5187. parent->enqueue(this); // nb outside sect critical block
  5188. return false; // always return false
  5189. }
  5190. unsigned get(MemoryBuffer &mb)
  5191. {
  5192. CriticalBlock block(sect);
  5193. assertex(pending);
  5194. size32_t sz = (size32_t)blk.length();
  5195. if (sz)
  5196. mb.setBuffer(sz,blk.detach(),true);
  5197. pending = false;
  5198. if (!active) {
  5199. active = true;
  5200. parent->add(*this);
  5201. }
  5202. return num;
  5203. }
  5204. size32_t size()
  5205. {
  5206. return (size32_t)blk.length();
  5207. }
  5208. ISocket *getSocket() { return sock; }
  5209. } *elems;
  5210. SimpleInterThreadQueueOf<SocketElem, false> readyq;
  5211. Owned<ISocketSelectHandler> selecthandler;
  5212. size32_t buffersize;
  5213. size32_t buffermax;
  5214. unsigned bufferwaiting;
  5215. CriticalSection buffersect;
  5216. Semaphore buffersem;
  5217. bool isdone;
  5218. public:
  5219. IMPLEMENT_IINTERFACE;
  5220. CSocketBufferReader(const char *trc)
  5221. {
  5222. selecthandler.setown(createSocketSelectHandler(trc));
  5223. elems = NULL;
  5224. }
  5225. ~CSocketBufferReader()
  5226. {
  5227. delete [] elems;
  5228. }
  5229. virtual void init(unsigned num,ISocket **sockets,size32_t _buffermax)
  5230. {
  5231. elems = new SocketElem[num];
  5232. for (unsigned i=0;i<num;i++) {
  5233. ISocket *sock = sockets[i];
  5234. if (sock) { // can have gaps
  5235. elems[i].init(this,sock,i);
  5236. add(elems[i]);
  5237. }
  5238. }
  5239. buffersize = 0;
  5240. buffermax = _buffermax;
  5241. bufferwaiting = 0;
  5242. isdone = false;
  5243. selecthandler->start();
  5244. }
  5245. virtual unsigned get(MemoryBuffer &mb)
  5246. {
  5247. SocketElem &e = *readyq.dequeue();
  5248. CriticalBlock block(buffersect);
  5249. assertex(buffersize>=e.size());
  5250. buffersize-=e.size();
  5251. if (bufferwaiting) {
  5252. buffersem.signal(bufferwaiting);
  5253. bufferwaiting = 0;
  5254. }
  5255. return e.get(mb);
  5256. }
  5257. virtual void done(bool wait)
  5258. {
  5259. buffersem.signal(0x10000);
  5260. isdone = true;
  5261. selecthandler->stop(wait);
  5262. if (wait) {
  5263. delete [] elems;
  5264. elems = NULL;
  5265. }
  5266. }
  5267. void enqueue(SocketElem *elem)
  5268. {
  5269. if (elem) {
  5270. CriticalBlock block(buffersect);
  5271. size32_t sz = elem->size();
  5272. while ((buffersize>0)&&(sz>0)&&(buffersize+sz>buffermax)) {
  5273. if (isdone)
  5274. return;
  5275. bufferwaiting++;
  5276. CriticalUnblock unblock(buffersect);
  5277. buffersem.wait();
  5278. }
  5279. buffersize += sz;
  5280. }
  5281. readyq.enqueue(elem);
  5282. }
  5283. void remove(ISocket *sock)
  5284. {
  5285. selecthandler->remove(sock);
  5286. }
  5287. void add(SocketElem &elem)
  5288. {
  5289. selecthandler->add(elem.getSocket(),SELECTMODE_READ,&elem);
  5290. }
  5291. };
  5292. ISocketBufferReader *createSocketBufferReader(const char *trc)
  5293. {
  5294. return new CSocketBufferReader(trc);
  5295. }
  5296. extern jlib_decl void markNodeCentral(SocketEndpoint &ep)
  5297. {
  5298. #ifdef CENTRAL_NODE_RANDOM_DELAY
  5299. CriticalBlock block(CSocket::crit);
  5300. CentralNodeArray.append(ep);
  5301. #endif
  5302. }
  5303. static CSocket *prepareSocket(unsigned idx,const SocketEndpoint &ep, ISocketConnectNotify &inotify)
  5304. {
  5305. Owned<CSocket> sock = new CSocket(ep,sm_tcp,NULL);
  5306. int err = sock->pre_connect(false);
  5307. if ((err == JSE_INPROGRESS)||(err == JSE_WOULDBLOCK))
  5308. return sock.getClear();
  5309. if (err==0) {
  5310. int err = sock->post_connect();
  5311. if (err==0)
  5312. inotify.connected(idx,ep,sock);
  5313. else {
  5314. sock->errclose();
  5315. inotify.failed(idx,ep,err);
  5316. }
  5317. }
  5318. else
  5319. inotify.failed(idx,ep,err);
  5320. return NULL;
  5321. }
  5322. void multiConnect(const SocketEndpointArray &eps,ISocketConnectNotify &inotify,unsigned timeout)
  5323. {
  5324. class SocketElem: implements ISocketSelectNotify, public CInterface
  5325. {
  5326. CriticalSection *sect;
  5327. ISocketSelectHandler *handler;
  5328. unsigned *remaining;
  5329. Semaphore *notifysem;
  5330. ISocketConnectNotify *inotify;
  5331. public:
  5332. Owned<CSocket> sock;
  5333. SocketEndpoint ep;
  5334. unsigned idx;
  5335. IMPLEMENT_IINTERFACE;
  5336. void init(CSocket *_sock,unsigned _idx,const SocketEndpoint &_ep,CriticalSection *_sect,ISocketSelectHandler *_handler,ISocketConnectNotify *_inotify, unsigned *_remaining, Semaphore *_notifysem)
  5337. {
  5338. ep = _ep;
  5339. idx = _idx;
  5340. inotify = _inotify;
  5341. sock.setown(_sock),
  5342. sect = _sect;
  5343. handler = _handler;
  5344. remaining = _remaining;
  5345. notifysem = _notifysem;
  5346. }
  5347. virtual bool notifySelected(ISocket *socket,unsigned selected)
  5348. {
  5349. CriticalBlock block(*sect);
  5350. handler->remove(socket);
  5351. int err = sock->post_connect();
  5352. CSocket *newsock = NULL;
  5353. {
  5354. CriticalUnblock unblock(*sect); // up to caller to cope with multithread
  5355. if (err==0)
  5356. inotify->connected(idx,ep,sock);
  5357. else if ((err==JSE_TIMEDOUT)||(err==JSE_CONNREFUSED)) {
  5358. // don't give up so easily (maybe listener not yet started (i.e. racing))
  5359. newsock = prepareSocket(idx,ep,*inotify);
  5360. Sleep(100); // not very nice but without this would just loop
  5361. }
  5362. else
  5363. inotify->failed(idx,ep,err);
  5364. }
  5365. if (newsock) {
  5366. sock.setown(newsock);
  5367. handler->add(sock,SELECTMODE_WRITE|SELECTMODE_EXCEPT,this);
  5368. }
  5369. else {
  5370. sock.clear();
  5371. (*remaining)--;
  5372. notifysem->signal();
  5373. }
  5374. return false;
  5375. }
  5376. } *elems;
  5377. unsigned n = eps.ordinality();
  5378. unsigned remaining = n;
  5379. if (!n)
  5380. return;
  5381. elems = new SocketElem[n];
  5382. unsigned i;
  5383. CriticalSection sect;
  5384. Semaphore notifysem;
  5385. Owned<ISocketSelectHandler> selecthandler = createSocketSelectHandler(
  5386. #ifdef _DEBUG
  5387. "multiConnect"
  5388. #else
  5389. NULL
  5390. #endif
  5391. );
  5392. StringBuffer name;
  5393. for (i=0;i<n;i++) {
  5394. CSocket* sock = prepareSocket(i,eps.item(i),inotify);
  5395. if (sock) {
  5396. elems[i].init(sock,i,eps.item(i),&sect,selecthandler,&inotify,&remaining,&notifysem);
  5397. selecthandler->add(sock,SELECTMODE_WRITE|SELECTMODE_EXCEPT,&elems[i]);
  5398. }
  5399. else
  5400. remaining--;
  5401. }
  5402. if (remaining) {
  5403. unsigned lastremaining=remaining;
  5404. selecthandler->start();
  5405. for (;;) {
  5406. bool to=!notifysem.wait(timeout);
  5407. {
  5408. CriticalBlock block(sect);
  5409. if (remaining==0)
  5410. break;
  5411. if (to&&(remaining==lastremaining))
  5412. break; // nothing happened recently
  5413. lastremaining = remaining;
  5414. }
  5415. }
  5416. selecthandler->stop(true);
  5417. }
  5418. selecthandler.clear();
  5419. if (remaining) {
  5420. for (unsigned j=0;j<n;j++) { // mop up timeouts
  5421. SocketElem &elem = elems[j];
  5422. if (elem.sock.get()) {
  5423. elem.sock.clear();
  5424. inotify.failed(j,elem.ep,-1);
  5425. remaining--;
  5426. if (remaining==0)
  5427. break;
  5428. }
  5429. }
  5430. }
  5431. delete [] elems;
  5432. }
  5433. void multiConnect(const SocketEndpointArray &eps, IPointerArrayOf<ISocket> &retsockets,unsigned timeout)
  5434. {
  5435. unsigned n = eps.ordinality();
  5436. if (n==0)
  5437. return;
  5438. if (n==1) { // no need for multi
  5439. ISocket *sock = NULL;
  5440. try {
  5441. sock = ISocket::connect_timeout(eps.item(0),timeout);
  5442. }
  5443. catch (IException *e) { // ignore error just append NULL
  5444. sock = NULL;
  5445. e->Release();
  5446. }
  5447. retsockets.append(sock);
  5448. return;
  5449. }
  5450. while (retsockets.ordinality()<n)
  5451. retsockets.append(NULL);
  5452. CriticalSection sect;
  5453. class cNotify: implements ISocketConnectNotify
  5454. {
  5455. CriticalSection &sect;
  5456. IPointerArrayOf<ISocket> &retsockets;
  5457. public:
  5458. cNotify(IPointerArrayOf<ISocket> &_retsockets,CriticalSection &_sect)
  5459. : sect(_sect), retsockets(_retsockets)
  5460. {
  5461. }
  5462. void connected(unsigned idx,const SocketEndpoint &ep,ISocket *sock)
  5463. {
  5464. CriticalBlock block(sect);
  5465. assertex(idx<retsockets.ordinality());
  5466. sock->Link();
  5467. retsockets.replace(sock,idx);
  5468. }
  5469. void failed(unsigned idx,const SocketEndpoint &ep,int err)
  5470. {
  5471. StringBuffer s;
  5472. PROGLOG("multiConnect failed to %s with %d",ep.getUrlStr(s).str(),err);
  5473. }
  5474. } notify(retsockets,sect);
  5475. multiConnect(eps,notify,timeout);
  5476. }
  5477. inline void flushText(StringBuffer &text,unsigned short port,unsigned &rep,unsigned &range)
  5478. {
  5479. if (rep) {
  5480. text.append('*').append(rep+1);
  5481. rep = 0;
  5482. }
  5483. else if (range) {
  5484. text.append('-').append(range);
  5485. range = 0;
  5486. }
  5487. if (port)
  5488. text.append(':').append(port);
  5489. }
  5490. StringBuffer &SocketEndpointArray::getText(StringBuffer &text)
  5491. {
  5492. unsigned count = ordinality();
  5493. if (!count)
  5494. return text;
  5495. if (count==1)
  5496. return item(0).getUrlStr(text);
  5497. byte lastip[4];
  5498. const SocketEndpoint &first = item(0);
  5499. bool lastis4 = first.getNetAddress(sizeof(lastip),&lastip)==sizeof(lastip);
  5500. unsigned short lastport = first.port;
  5501. first.getIpText(text);
  5502. unsigned rep=0;
  5503. unsigned range=0;
  5504. for (unsigned i=1;i<count;i++) {
  5505. byte ip[4];
  5506. const SocketEndpoint &ep = item(i);
  5507. bool is4 = ep.getNetAddress(sizeof(ip),&ip)==sizeof(ip);
  5508. if (!lastis4||!is4) {
  5509. flushText(text,lastport,rep,range);
  5510. text.append(',');
  5511. ep.getIpText(text);
  5512. }
  5513. else { // try and shorten
  5514. unsigned j;
  5515. for (j=0;j<4;j++)
  5516. if (ip[j]!=lastip[j])
  5517. break;
  5518. if (ep.port==lastport) {
  5519. if (j==4) {
  5520. if (range) // cant have range and rep
  5521. j--; // pretend only 3 matched
  5522. else {
  5523. rep++;
  5524. continue;
  5525. }
  5526. }
  5527. else if ((j==3)&&(lastip[3]+1==ip[3])&&(rep==0)) {
  5528. range = ip[3];
  5529. lastip[3] = (byte)range;
  5530. continue;
  5531. }
  5532. }
  5533. flushText(text,lastport,rep,range);
  5534. // output diff
  5535. text.append(',');
  5536. if (j==4)
  5537. j--;
  5538. for (unsigned k=j;k<4;k++) {
  5539. if (k>j)
  5540. text.append('.');
  5541. text.append((int)ip[k]);
  5542. }
  5543. }
  5544. memcpy(&lastip,&ip,sizeof(lastip));
  5545. lastis4 = is4;
  5546. lastport = ep.port;
  5547. }
  5548. flushText(text,lastport,rep,range);
  5549. return text;
  5550. }
  5551. inline const char *getnum(const char *s,unsigned &n)
  5552. {
  5553. n = 0;
  5554. while (isdigit(*s)) {
  5555. n = n*10+(*s-'0');
  5556. s++;
  5557. }
  5558. return s;
  5559. }
  5560. inline bool appendv4range(SocketEndpointArray *array,char *str,SocketEndpoint &ep, unsigned defport)
  5561. {
  5562. char *s = str;
  5563. unsigned dc = 0;
  5564. unsigned port = defport;
  5565. unsigned rng = 0;
  5566. unsigned rep = 1;
  5567. bool notip = false;
  5568. while (*s) {
  5569. if (*s=='.') {
  5570. dc++;
  5571. s++;
  5572. }
  5573. else if (*s==':') {
  5574. *s = 0;
  5575. s = (char *)getnum(s+1,port);
  5576. }
  5577. else if (*s=='-')
  5578. {
  5579. if (!notip)//don't assume '-' is an ip range delimiter
  5580. { //Allow '-' in any octet due to ip group support
  5581. *s = 0;
  5582. s = (char *)getnum(s+1,rng);
  5583. }
  5584. else
  5585. {
  5586. notip = true;
  5587. s++;
  5588. }
  5589. }
  5590. else if (*s=='*') {
  5591. *s = 0;
  5592. s = (char *)getnum(s+1,rep);
  5593. }
  5594. else {
  5595. if (!isdigit(*s))
  5596. notip = true;
  5597. s++;
  5598. }
  5599. }
  5600. ep.port = port;
  5601. if (*str) {
  5602. if (!notip&&((dc<3)&&((dc!=1)||(strlen(str)!=1)))) {
  5603. if (!ep.isIp4()) {
  5604. return false;
  5605. }
  5606. StringBuffer tmp;
  5607. ep.getIpText(tmp);
  5608. dc++;
  5609. for (;;) {
  5610. if (tmp.length()==0)
  5611. return false;
  5612. if (tmp.charAt(tmp.length()-1)=='.')
  5613. if (--dc==0)
  5614. break;
  5615. tmp.setLength(tmp.length()-1);
  5616. }
  5617. tmp.append(str);
  5618. if (rng) {
  5619. tmp.appendf("-%d",rng);
  5620. rep = ep.ipsetrange(tmp.str());
  5621. }
  5622. else
  5623. ep.ipset(tmp.str());
  5624. }
  5625. else if (rng) { // not nice as have to add back range (must be better way - maybe ipincrementto) TBD
  5626. StringBuffer tmp;
  5627. tmp.appendf("%s-%d",str,rng);
  5628. rep = ep.ipsetrange(tmp.str());
  5629. }
  5630. else if (*str)
  5631. ep.ipset(str);
  5632. if (ep.isNull())
  5633. ep.port = 0;
  5634. for (unsigned i=0;i<rep;i++) {
  5635. array->append(ep);
  5636. if (rng)
  5637. ep.ipincrement(1);
  5638. }
  5639. }
  5640. else {// just a port change
  5641. if (ep.isNull()) // avoid null values with ports
  5642. ep.port = 0;
  5643. array->append(ep);
  5644. }
  5645. return true;
  5646. }
  5647. void SocketEndpointArray::fromText(const char *text,unsigned defport)
  5648. {
  5649. // this is quite complicated with (mixed) IPv4 and IPv6
  5650. // only support 'full' IPv6 and no ranges
  5651. if (!text)
  5652. return;
  5653. char *str = strdup(text);
  5654. char *s = str;
  5655. SocketEndpoint ep;
  5656. bool eol = false;
  5657. for (;;) {
  5658. while (isspace(*s)||(*s==','))
  5659. s++;
  5660. if (!*s)
  5661. break;
  5662. char *e=s;
  5663. if (*e=='[') { // we have a IPv6
  5664. while (*e&&(*e!=']'))
  5665. e++;
  5666. while ((*e!=',')&&!isspace(*e)) {
  5667. if (!*s) {
  5668. eol = true;
  5669. break;
  5670. }
  5671. e++;
  5672. }
  5673. *e = 0;
  5674. ep.set(s,defport);
  5675. if (ep.isNull()) {
  5676. // Error TBD
  5677. }
  5678. append(ep);
  5679. }
  5680. else {
  5681. bool hascolon = false;
  5682. bool isv6 = false;
  5683. do {
  5684. if (*e==':') {
  5685. if (hascolon)
  5686. isv6 = true;
  5687. else
  5688. hascolon = true;
  5689. }
  5690. e++;
  5691. if (!*e) {
  5692. eol = true;
  5693. break;
  5694. }
  5695. } while (!isspace(*e)&&(*e!=','));
  5696. *e = 0;
  5697. if (isv6) {
  5698. ep.set(s,defport);
  5699. if (ep.isNull()) {
  5700. // Error TBD
  5701. }
  5702. append(ep);
  5703. }
  5704. else {
  5705. if (!appendv4range(this,s,ep,defport)) {
  5706. // Error TBD
  5707. }
  5708. }
  5709. }
  5710. if (eol)
  5711. break;
  5712. s = e+1;
  5713. }
  5714. free(str);
  5715. }
  5716. bool IpSubNet::set(const char *_net,const char *_mask)
  5717. {
  5718. if (!_net||!decodeNumericIP(_net,net)) { // _net NULL means match everything
  5719. memset(net,0,sizeof(net));
  5720. memset(mask,0,sizeof(mask));
  5721. return (_net==NULL);
  5722. }
  5723. if (!_mask||!decodeNumericIP(_mask,mask)) { // _mask NULL means match exact
  5724. memset(mask,0xff,sizeof(mask));
  5725. return (_mask==NULL);
  5726. }
  5727. if (isIp4(net)!=isIp4(mask))
  5728. return false;
  5729. for (unsigned j=0;j<4;j++)
  5730. if (net[j]&~mask[j])
  5731. return false;
  5732. return true;
  5733. }
  5734. bool IpSubNet::test(const IpAddress &ip) const
  5735. {
  5736. unsigned i;
  5737. if (ip.getNetAddress(sizeof(i),&i)==sizeof(i)) {
  5738. if (!isIp4(net))
  5739. return false;
  5740. return (i&mask[3])==(net[3]&mask[3]);
  5741. }
  5742. unsigned na[4];
  5743. if (ip.getNetAddress(sizeof(na),&na)==sizeof(na)) {
  5744. for (unsigned j=0;j<4;j++)
  5745. if ((na[j]&mask[j])!=(net[j]&mask[j]))
  5746. return false;
  5747. return true;
  5748. }
  5749. return false;
  5750. }
  5751. StringBuffer IpSubNet::getNetText(StringBuffer &text) const
  5752. {
  5753. char tmp[INET6_ADDRSTRLEN];
  5754. const char *res = ::isIp4(net) ? _inet_ntop(AF_INET, &net[3], tmp, sizeof(tmp))
  5755. : _inet_ntop(AF_INET6, &net, tmp, sizeof(tmp));
  5756. return text.append(res);
  5757. }
  5758. StringBuffer IpSubNet::getMaskText(StringBuffer &text) const
  5759. {
  5760. char tmp[INET6_ADDRSTRLEN];
  5761. // isIp4(net) is correct here
  5762. const char *res = ::isIp4(net) ? _inet_ntop(AF_INET, &mask[3], tmp, sizeof(tmp))
  5763. : _inet_ntop(AF_INET6, &mask, tmp, sizeof(tmp));
  5764. return text.append(res);
  5765. }
  5766. bool IpSubNet::isNull() const
  5767. {
  5768. for (unsigned i=0;i<4;i++)
  5769. if (net[i]||mask[i])
  5770. return false;
  5771. return true;
  5772. }
  5773. IpSubNet &queryPreferredSubnet()
  5774. {
  5775. return PreferredSubnet;
  5776. }
  5777. bool setPreferredSubnet(const char *ip,const char *mask)
  5778. {
  5779. // also resets cached host IP
  5780. if (PreferredSubnet.set(ip,mask))
  5781. {
  5782. if (!cachehostip.isNull())
  5783. {
  5784. cachehostip.ipset(NULL);
  5785. queryHostIP();
  5786. }
  5787. return true;
  5788. }
  5789. else
  5790. return false;
  5791. }
  5792. StringBuffer lookupHostName(const IpAddress &ip,StringBuffer &ret)
  5793. {
  5794. // not a common routine (no Jlib function!) only support IPv4 initially
  5795. unsigned ipa;
  5796. if (ip.getNetAddress(sizeof(ipa),&ipa)==sizeof(ipa)) {
  5797. struct hostent *phostent = gethostbyaddr( (char *) &ipa, sizeof(ipa), PF_INET);
  5798. if (phostent)
  5799. ret.append(phostent->h_name);
  5800. else
  5801. ip.getIpText(ret);
  5802. }
  5803. else
  5804. ip.getIpText(ret);
  5805. return ret;
  5806. }
  5807. struct SocketEndpointHTElem
  5808. {
  5809. IInterface *ii;
  5810. SocketEndpoint ep;
  5811. SocketEndpointHTElem(const SocketEndpoint _ep,IInterface *_ii) { ep.set(_ep); ii = _ii; }
  5812. ~SocketEndpointHTElem() { ::Release(ii); }
  5813. };
  5814. class jlib_decl CSocketEndpointHashTable : public SuperHashTableOf<SocketEndpointHTElem, SocketEndpoint>, implements ISocketEndpointHashTable
  5815. {
  5816. virtual void onAdd(void *) {}
  5817. virtual void onRemove(void *e) { delete (SocketEndpointHTElem *)e; }
  5818. unsigned getHashFromElement(const void *e) const
  5819. {
  5820. return ((const SocketEndpointHTElem *)e)->ep.hash(0);
  5821. }
  5822. unsigned getHashFromFindParam(const void *fp) const
  5823. {
  5824. return ((const SocketEndpoint *)fp)->hash(0);
  5825. }
  5826. const void * getFindParam(const void *p) const
  5827. {
  5828. return &((const SocketEndpointHTElem *)p)->ep;
  5829. }
  5830. bool matchesFindParam(const void * et, const void *fp, unsigned) const
  5831. {
  5832. return ((const SocketEndpointHTElem *)et)->ep.equals(*(SocketEndpoint *)fp);
  5833. }
  5834. IMPLEMENT_SUPERHASHTABLEOF_REF_FIND(SocketEndpointHTElem,SocketEndpoint);
  5835. public:
  5836. IMPLEMENT_IINTERFACE;
  5837. CSocketEndpointHashTable() {}
  5838. ~CSocketEndpointHashTable() { _releaseAll(); }
  5839. void add(const SocketEndpoint &ep, IInterface *i)
  5840. {
  5841. SocketEndpointHTElem *e = SuperHashTableOf<SocketEndpointHTElem,SocketEndpoint>::find(&ep);
  5842. if (e) {
  5843. ::Release(e->ii);
  5844. e->ii = i;
  5845. }
  5846. else {
  5847. e = new SocketEndpointHTElem(ep,i);
  5848. SuperHashTableOf<SocketEndpointHTElem,SocketEndpoint>::add(*e);
  5849. }
  5850. }
  5851. void remove(const SocketEndpoint &ep)
  5852. {
  5853. SuperHashTableOf<SocketEndpointHTElem,SocketEndpoint>::remove(&ep);
  5854. }
  5855. IInterface *find(const SocketEndpoint &ep)
  5856. {
  5857. SocketEndpointHTElem *e = SuperHashTableOf<SocketEndpointHTElem,SocketEndpoint>::find(&ep);
  5858. if (e)
  5859. return e->ii;
  5860. return NULL;
  5861. }
  5862. };
  5863. ISocketEndpointHashTable *createSocketEndpointHashTable()
  5864. {
  5865. CSocketEndpointHashTable *ht = new CSocketEndpointHashTable;
  5866. return ht;
  5867. }
  5868. class CSocketConnectWait: implements ISocketConnectWait, public CInterface
  5869. {
  5870. Owned<CSocket> sock;
  5871. bool done;
  5872. CTimeMon connecttm;
  5873. unsigned startt;
  5874. bool oneshot;
  5875. bool isopen;
  5876. int initerr;
  5877. void successfulConnect()
  5878. {
  5879. STATS.connects++;
  5880. STATS.connecttime+=usTick()-startt;
  5881. #ifdef _TRACE
  5882. sock->setTraceName();
  5883. #endif
  5884. }
  5885. void failedConnect()
  5886. {
  5887. STATS.failedconnects++;
  5888. STATS.failedconnecttime+=usTick()-startt;
  5889. const char* tracename = sock->tracename;
  5890. THROWJSOCKEXCEPTION(JSOCKERR_connection_failed);
  5891. }
  5892. public:
  5893. IMPLEMENT_IINTERFACE;
  5894. CSocketConnectWait(SocketEndpoint &ep,unsigned connecttimeoutms)
  5895. : connecttm(connecttimeoutms)
  5896. {
  5897. oneshot = (connecttimeoutms==0); // i.e. as long as one connect takes
  5898. done = false;
  5899. startt = usTick();
  5900. sock.setown(new CSocket(ep,sm_tcp,NULL));
  5901. isopen = true;
  5902. initerr = sock->pre_connect(false);
  5903. }
  5904. ISocket *wait(unsigned timems)
  5905. {
  5906. // this is a bit spagetti due to dual timeouts etc
  5907. CTimeMon waittm(timems);
  5908. unsigned refuseddelay = 1;
  5909. bool waittimedout = false;
  5910. bool connectimedout = false;
  5911. do {
  5912. bool connectdone = false;
  5913. unsigned remaining;
  5914. connectimedout = connecttm.timedout(&remaining);
  5915. unsigned waitremaining;
  5916. waittimedout = waittm.timedout(&waitremaining);
  5917. if (oneshot||(waitremaining<remaining))
  5918. remaining = waitremaining;
  5919. int err = 0;
  5920. if (!isopen||initerr)
  5921. {
  5922. isopen = true;
  5923. err = initerr?initerr:sock->pre_connect(false);
  5924. initerr = 0;
  5925. if ((err == JSE_INPROGRESS)||(err == JSE_WOULDBLOCK))
  5926. err = 0; // continue
  5927. else
  5928. {
  5929. if (err==0)
  5930. connectdone = true; // done immediately
  5931. else if(!oneshot) // probably ECONNREFUSED but treat all errors same
  5932. refused_sleep((waitremaining==remaining)?waittm:connecttm,refuseddelay); // this stops becoming cpu bound
  5933. }
  5934. }
  5935. if (!connectdone&&(err==0))
  5936. {
  5937. SOCKET s = sock->sock;
  5938. #ifdef _USE_SELECT
  5939. CHECKSOCKRANGE(s);
  5940. T_FD_SET fds;
  5941. struct timeval tv;
  5942. XFD_ZERO(&fds);
  5943. FD_SET((unsigned)s, &fds);
  5944. T_FD_SET except;
  5945. XFD_ZERO(&except);
  5946. FD_SET((unsigned)s, &except);
  5947. tv.tv_sec = remaining / 1000;
  5948. tv.tv_usec = (remaining % 1000)*1000;
  5949. int rc = ::select( s + 1, NULL, (fd_set *)&fds, (fd_set *)&except, &tv );
  5950. #else
  5951. struct pollfd fds[1];
  5952. fds[0].fd = s;
  5953. fds[0].events = POLLOUT;
  5954. fds[0].revents = 0;
  5955. int rc = ::poll(fds, 1, remaining);
  5956. #endif
  5957. if (rc==0)
  5958. break; // timeout
  5959. done = true;
  5960. err = 0;
  5961. if (rc>0)
  5962. {
  5963. // select/poll succeeded - return error from socket (0 if connected)
  5964. socklen_t errlen = sizeof(err);
  5965. rc = getsockopt(s, SOL_SOCKET, SO_ERROR, (char *)&err, &errlen); // check for error
  5966. if ((rc!=0)&&!err)
  5967. err = ERRNO(); // some implementations of getsockopt duff
  5968. if (err&&!oneshot) // probably ECONNREFUSED but treat all errors same
  5969. refused_sleep((waitremaining==remaining)?waittm:connecttm,refuseddelay); // this stops becoming cpu bound
  5970. }
  5971. else
  5972. { // select/poll failed
  5973. err = ERRNO();
  5974. LOGERR(err,2,"CSocketConnectWait ::select/poll");
  5975. }
  5976. }
  5977. if (err==0)
  5978. {
  5979. err = sock->post_connect();
  5980. if (err==0)
  5981. {
  5982. successfulConnect();
  5983. return sock.getClear();
  5984. }
  5985. }
  5986. sock->errclose();
  5987. isopen = false;
  5988. } while (!waittimedout&&!oneshot);
  5989. if (connectimedout)
  5990. {
  5991. STATS.failedconnects++;
  5992. STATS.failedconnecttime+=usTick()-startt;
  5993. const char* tracename = sock->tracename;
  5994. THROWJSOCKEXCEPTION(JSOCKERR_connection_failed);
  5995. }
  5996. return NULL;
  5997. }
  5998. };
  5999. ISocketConnectWait *nonBlockingConnect(SocketEndpoint &ep,unsigned connecttimeoutms)
  6000. {
  6001. return new CSocketConnectWait(ep,connecttimeoutms);
  6002. }
  6003. int wait_multiple(bool isRead, //IN true if wait read, false it wait write
  6004. UnsignedArray &socks, //IN sockets to be checked for readiness
  6005. unsigned timeoutMS, //IN timeout
  6006. UnsignedArray &readySocks) //OUT sockets ready
  6007. {
  6008. aindex_t numSocks = socks.length();
  6009. if (numSocks == 0)
  6010. THROWJSOCKEXCEPTION2(JSOCKERR_bad_address);
  6011. #ifdef _DEBUG
  6012. StringBuffer dbgSB("wait_multiple() on sockets :");
  6013. #endif
  6014. #ifdef _USE_SELECT
  6015. SOCKET maxSocket = 0;
  6016. T_FD_SET fds;
  6017. XFD_ZERO(&fds);
  6018. //Add each SOCKET in array to T_FD_SET
  6019. for (aindex_t idx = 0; idx < numSocks; idx++)
  6020. {
  6021. #ifdef _DEBUG
  6022. dbgSB.appendf(" %d",socks.item(idx));
  6023. #endif
  6024. SOCKET s = socks.item(idx);
  6025. CHECKSOCKRANGE(s);
  6026. maxSocket = s > maxSocket ? s : maxSocket;
  6027. FD_SET((unsigned)s, &fds);
  6028. }
  6029. #else
  6030. struct pollfd *fds = nullptr;
  6031. try
  6032. {
  6033. fds = new pollfd[numSocks];
  6034. }
  6035. catch (const std::bad_alloc &e)
  6036. {
  6037. int err = ERRNO();
  6038. throw MakeStringException(-1,"wait_multiple::fds malloc failure %d", err);
  6039. }
  6040. for (aindex_t idx = 0; idx < numSocks; idx++)
  6041. {
  6042. #ifdef _DEBUG
  6043. dbgSB.appendf(" %d",socks.item(idx));
  6044. #endif
  6045. SOCKET s = socks.item(idx);
  6046. fds[idx].fd = s;
  6047. fds[idx].events = isRead ? POLLIN : POLLOUT;
  6048. fds[idx].revents = 0;
  6049. }
  6050. #endif
  6051. #ifdef _DEBUG
  6052. DBGLOG("%s",dbgSB.str());
  6053. #endif
  6054. //Check socket states
  6055. int res = 0;
  6056. #ifdef _USE_SELECT
  6057. if (timeoutMS == WAIT_FOREVER)
  6058. res = ::select( maxSocket + 1, isRead ? (fd_set *)&fds : NULL, isRead ? NULL : (fd_set *)&fds, NULL, NULL );
  6059. else
  6060. {
  6061. struct timeval tv;
  6062. tv.tv_sec = timeoutMS / 1000;
  6063. tv.tv_usec = (timeoutMS % 1000)*1000;
  6064. res = ::select( maxSocket + 1, isRead ? (fd_set *)&fds : NULL, isRead ? NULL : (fd_set *)&fds, NULL, &tv );
  6065. }
  6066. #else
  6067. res = poll(fds, numSocks, timeoutMS);
  6068. #endif
  6069. if (res > 0)
  6070. {
  6071. #ifdef _DEBUG
  6072. StringBuffer dbgSB("wait_multiple() ready socket(s) :");
  6073. #endif
  6074. //Build up list of socks which are ready for accept read/write without blocking
  6075. for (aindex_t idx = 0; idx < numSocks; idx++)
  6076. {
  6077. SOCKET s = socks.item(idx);
  6078. #ifdef _USE_SELECT
  6079. if (FD_ISSET(s, &fds))
  6080. #else
  6081. if ( (fds[idx].revents) && (!(fds[idx].revents & POLLNVAL)) )
  6082. #endif
  6083. {
  6084. #ifdef _DEBUG
  6085. dbgSB.appendf(" %d",s);
  6086. #endif
  6087. readySocks.append(s);
  6088. if ((int) readySocks.length() == res)
  6089. break;
  6090. }
  6091. }
  6092. #ifdef _DEBUG
  6093. DBGLOG("%s",dbgSB.str());
  6094. #endif
  6095. }
  6096. else if (res == SOCKET_ERROR)
  6097. {
  6098. res = 0; // dont return negative on failure
  6099. int err = ERRNO();
  6100. if (err != JSE_INTR)
  6101. {
  6102. #ifndef _USE_SELECT
  6103. delete [] fds;
  6104. #endif
  6105. throw MakeStringException(-1,"wait_multiple::select/poll error %d", err);
  6106. }
  6107. }
  6108. #ifndef _USE_SELECT
  6109. delete [] fds;
  6110. #endif
  6111. return res;
  6112. }
  6113. //Given a list of sockets, wait until any one or more are ready to be read (wont block)
  6114. //returns 0 if timeout, number of waiting sockets otherwise
  6115. int wait_read_multiple(UnsignedArray &socks, //IN sockets to be checked for readiness
  6116. unsigned timeoutMS, //IN timeout
  6117. UnsignedArray &readySocks) //OUT sockets ready to be read
  6118. {
  6119. return wait_multiple(true, socks, timeoutMS, readySocks);
  6120. }
  6121. int wait_write_multiple(UnsignedArray &socks, //IN sockets to be checked for readiness
  6122. unsigned timeoutMS, //IN timeout
  6123. UnsignedArray &readySocks) //OUT sockets ready to be written
  6124. {
  6125. return wait_multiple(false, socks, timeoutMS, readySocks);
  6126. }